mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 18:10:37 +00:00
Compare commits
1 Commits
problame/b
...
arthur/rep
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f1fcb4d0d7 |
@@ -24,3 +24,4 @@
|
||||
!storage_controller/
|
||||
!vendor/postgres-*/
|
||||
!workspace_hack/
|
||||
!debug-oom/
|
||||
|
||||
47
.github/workflows/build_and_test.yml
vendored
47
.github/workflows/build_and_test.yml
vendored
@@ -728,6 +728,30 @@ jobs:
|
||||
tags: |
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
|
||||
- name: Build compute-tools image
|
||||
# compute-tools are Postgres independent, so build it only once
|
||||
# We pick 16, because that builds on debian 11 with older glibc (and is
|
||||
# thus compatible with newer glibc), rather than 17 on Debian 12, as
|
||||
# that isn't guaranteed to be compatible with Debian 11
|
||||
if: matrix.version.pg == 'v16'
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
target: compute-tools-image
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
|
||||
DEBIAN_VERSION=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/compute-node.Dockerfile
|
||||
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-tools-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
|
||||
compute-node-image:
|
||||
needs: [ compute-node-image-arch, tag ]
|
||||
permissions:
|
||||
@@ -770,6 +794,14 @@ jobs:
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
|
||||
|
||||
- name: Create multi-arch compute-tools image
|
||||
if: matrix.version.pg == 'v16'
|
||||
run: |
|
||||
docker buildx imagetools create -t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }} \
|
||||
-t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
|
||||
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
@@ -785,6 +817,12 @@ jobs:
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
- name: Push multi-arch compute-tools image to ECR
|
||||
if: matrix.version.pg == 'v16'
|
||||
run: |
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
vm-compute-node-image:
|
||||
needs: [ check-permissions, tag, compute-node-image ]
|
||||
runs-on: [ self-hosted, large ]
|
||||
@@ -963,6 +1001,9 @@ jobs:
|
||||
docker buildx imagetools create -t $repo/neon:latest \
|
||||
$repo/neon:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
docker buildx imagetools create -t $repo/compute-tools:latest \
|
||||
$repo/compute-tools:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
for version in ${VERSIONS}; do
|
||||
docker buildx imagetools create -t $repo/compute-node-${version}:latest \
|
||||
$repo/compute-node-${version}:${{ needs.tag.outputs.build-tag }}
|
||||
@@ -991,7 +1032,7 @@ jobs:
|
||||
- name: Copy all images to prod ECR
|
||||
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
|
||||
run: |
|
||||
for image in neon {vm-,}compute-node-{v14,v15,v16,v17}; do
|
||||
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16,v17}; do
|
||||
docker buildx imagetools create -t 093970136003.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }} \
|
||||
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
@@ -1003,7 +1044,7 @@ jobs:
|
||||
with:
|
||||
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
|
||||
image_tag: ${{ needs.tag.outputs.build-tag }}
|
||||
images: neon vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
|
||||
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
tenant_id: ${{ vars.AZURE_TENANT_ID }}
|
||||
@@ -1015,7 +1056,7 @@ jobs:
|
||||
with:
|
||||
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
|
||||
image_tag: ${{ needs.tag.outputs.build-tag }}
|
||||
images: neon vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
|
||||
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
|
||||
tenant_id: ${{ vars.AZURE_TENANT_ID }}
|
||||
|
||||
220
Cargo.lock
generated
220
Cargo.lock
generated
@@ -718,13 +718,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.7.9"
|
||||
version = "0.7.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
|
||||
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
"base64 0.22.1",
|
||||
"base64 0.21.1",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
@@ -746,8 +746,8 @@ dependencies = [
|
||||
"sha1",
|
||||
"sync_wrapper 1.0.1",
|
||||
"tokio",
|
||||
"tokio-tungstenite 0.24.0",
|
||||
"tower 0.5.2",
|
||||
"tokio-tungstenite",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
@@ -1267,7 +1267,6 @@ dependencies = [
|
||||
"aws-config",
|
||||
"aws-sdk-kms",
|
||||
"aws-sdk-s3",
|
||||
"axum",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"camino",
|
||||
@@ -1278,7 +1277,7 @@ dependencies = [
|
||||
"fail",
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"hyper 0.14.30",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
@@ -1304,8 +1303,6 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower 0.5.2",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
@@ -1653,20 +1650,6 @@ dependencies = [
|
||||
"parking_lot_core 0.9.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "6.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
"hashbrown 0.14.5",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core 0.9.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "data-encoding"
|
||||
version = "2.4.0"
|
||||
@@ -1966,15 +1949,6 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_filter"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
|
||||
dependencies = [
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.10.2"
|
||||
@@ -1988,16 +1962,6 @@ dependencies = [
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d"
|
||||
dependencies = [
|
||||
"env_filter",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator"
|
||||
version = "0.2.2"
|
||||
@@ -2756,7 +2720,7 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tower 0.4.13",
|
||||
"tower",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
@@ -2981,28 +2945,6 @@ dependencies = [
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75a5d75fee4d36809e6b021e4b96b686e763d365ffdb03af2bd00786353f84fe"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"clap",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"dashmap 6.1.0",
|
||||
"env_logger 0.11.2",
|
||||
"indexmap 2.0.1",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
"once_cell",
|
||||
"quick-xml 0.37.1",
|
||||
"rgb",
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
@@ -3210,7 +3152,7 @@ version = "0.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4644821e1c3d7a560fe13d842d13f587c07348a1a05d3a797152d41c90c56df2"
|
||||
dependencies = [
|
||||
"dashmap 5.5.0",
|
||||
"dashmap",
|
||||
"hashbrown 0.13.2",
|
||||
]
|
||||
|
||||
@@ -3318,9 +3260,9 @@ checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.8.4"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
|
||||
checksum = "540f1c43aed89909c0cc0cc604e3bb2f7e7a341a3728a9e6cfe760e733cd11ed"
|
||||
|
||||
[[package]]
|
||||
name = "md-5"
|
||||
@@ -3748,23 +3690,23 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.27.1"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7"
|
||||
checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"js-sys",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-http"
|
||||
version = "0.27.0"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80"
|
||||
checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -3775,9 +3717,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-otlp"
|
||||
version = "0.27.0"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76"
|
||||
checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-core",
|
||||
@@ -3793,9 +3735,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "0.27.0"
|
||||
version = "0.26.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6"
|
||||
checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34"
|
||||
dependencies = [
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
@@ -3805,21 +3747,22 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-semantic-conventions"
|
||||
version = "0.27.0"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52"
|
||||
checksum = "db945c1eaea8ac6a9677185357480d215bb6999faa9f691d0c4d4d641eab7a09"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.27.1"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8"
|
||||
checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"glob",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"percent-encoding",
|
||||
"rand 0.8.5",
|
||||
@@ -3827,7 +3770,6 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4476,7 +4418,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"crc32c",
|
||||
"criterion",
|
||||
"env_logger 0.10.2",
|
||||
"env_logger",
|
||||
"log",
|
||||
"memoffset 0.9.0",
|
||||
"once_cell",
|
||||
@@ -4517,7 +4459,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"criterion",
|
||||
"findshlibs",
|
||||
"inferno 0.11.21",
|
||||
"inferno",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
@@ -4552,9 +4494,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "pq-sys"
|
||||
version = "0.6.3"
|
||||
version = "0.4.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
|
||||
checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
|
||||
dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
@@ -4743,9 +4685,9 @@ dependencies = [
|
||||
"clap",
|
||||
"compute_api",
|
||||
"consumption_metrics",
|
||||
"dashmap 5.5.0",
|
||||
"dashmap",
|
||||
"ecdsa 0.16.9",
|
||||
"env_logger 0.10.2",
|
||||
"env_logger",
|
||||
"fallible-iterator",
|
||||
"flate2",
|
||||
"framed-websockets",
|
||||
@@ -4816,7 +4758,7 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
"tokio-postgres2",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-tungstenite 0.21.0",
|
||||
"tokio-tungstenite",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
@@ -4852,15 +4794,6 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.37.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f22f29bdff3987b4d8632ef95fd6424ec7e4e0a57e2f4fc63e489e75357f6a03"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.37"
|
||||
@@ -5245,15 +5178,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "reqwest-tracing"
|
||||
version = "0.5.5"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "73e6153390585f6961341b50e5a1931d6be6dee4292283635903c26ef9d980d2"
|
||||
checksum = "ff82cf5730a1311fb9413b0bc2b8e743e0157cd73f010ab4ec374a923873b6a2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"getrandom 0.2.11",
|
||||
"http 1.1.0",
|
||||
"matchit 0.8.4",
|
||||
"matchit 0.8.2",
|
||||
"opentelemetry",
|
||||
"reqwest",
|
||||
"reqwest-middleware",
|
||||
@@ -6867,19 +6800,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"tokio",
|
||||
"tungstenite 0.21.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-tungstenite"
|
||||
version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"tokio",
|
||||
"tungstenite 0.24.0",
|
||||
"tungstenite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6960,7 +6881,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-stream",
|
||||
"tower 0.4.13",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
@@ -7000,50 +6921,17 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"pin-project-lite",
|
||||
"sync_wrapper 1.0.1",
|
||||
"tokio",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-http"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"pin-project-lite",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-layer"
|
||||
version = "0.3.3"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
|
||||
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.3"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
|
||||
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
@@ -7112,9 +7000,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-opentelemetry"
|
||||
version = "0.28.0"
|
||||
version = "0.27.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053"
|
||||
checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"once_cell",
|
||||
@@ -7198,24 +7086,6 @@ dependencies = [
|
||||
"utf-8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tungstenite"
|
||||
version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"data-encoding",
|
||||
"http 1.1.0",
|
||||
"httparse",
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
"sha1",
|
||||
"thiserror",
|
||||
"utf-8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "twox-hash"
|
||||
version = "1.6.3"
|
||||
@@ -7383,7 +7253,6 @@ dependencies = [
|
||||
"hex-literal",
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"inferno 0.12.0",
|
||||
"itertools 0.10.5",
|
||||
"jemalloc_pprof",
|
||||
"jsonwebtoken",
|
||||
@@ -7487,7 +7356,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"camino-tempfile",
|
||||
"clap",
|
||||
"env_logger 0.10.2",
|
||||
"env_logger",
|
||||
"log",
|
||||
"postgres",
|
||||
"postgres_ffi",
|
||||
@@ -7998,8 +7867,7 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic",
|
||||
"tower 0.4.13",
|
||||
"tower 0.5.2",
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"url",
|
||||
|
||||
19
Cargo.toml
19
Cargo.toml
@@ -65,7 +65,7 @@ aws-smithy-types = "1.2"
|
||||
aws-credential-types = "1.2.0"
|
||||
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
||||
aws-types = "1.3"
|
||||
axum = { version = "0.7.9", features = ["ws"] }
|
||||
axum = { version = "0.7.5", features = ["ws"] }
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.70"
|
||||
@@ -110,7 +110,6 @@ hyper-util = "0.1"
|
||||
tokio-tungstenite = "0.21.0"
|
||||
indexmap = "2"
|
||||
indoc = "2"
|
||||
inferno = "0.12.0"
|
||||
ipnet = "2.10.0"
|
||||
itertools = "0.10"
|
||||
itoa = "1.0.11"
|
||||
@@ -127,10 +126,10 @@ notify = "6.0.0"
|
||||
num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.27"
|
||||
opentelemetry_sdk = "0.27"
|
||||
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.27"
|
||||
opentelemetry = "0.26"
|
||||
opentelemetry_sdk = "0.26"
|
||||
opentelemetry-otlp = { version = "0.26", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.26"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
@@ -144,7 +143,7 @@ rand = "0.8"
|
||||
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
regex = "1.10.2"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
|
||||
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
|
||||
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_26"] }
|
||||
reqwest-middleware = "0.4"
|
||||
reqwest-retry = "0.7"
|
||||
routerify = "3"
|
||||
@@ -188,12 +187,10 @@ tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = {version = "0.12.3", features = ["tls", "tls-roots"]}
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
|
||||
tower-service = "0.3.3"
|
||||
tower-service = "0.3.2"
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2"
|
||||
tracing-opentelemetry = "0.28"
|
||||
tracing-opentelemetry = "0.27"
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
|
||||
try-lock = "0.2.5"
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
|
||||
@@ -258,7 +258,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.84.0
|
||||
ENV RUSTC_VERSION=1.83.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -15,7 +15,6 @@ aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
aws-sdk-kms.workspace = true
|
||||
anyhow.workspace = true
|
||||
axum = { workspace = true, features = [] }
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
cfg-if.workspace = true
|
||||
@@ -23,7 +22,7 @@ clap.workspace = true
|
||||
fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
hyper0 = { workspace = true, features = ["full"] }
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
@@ -38,8 +37,6 @@ serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
tar.workspace = true
|
||||
tower.workspace = true
|
||||
tower-http.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tokio-postgres.workspace = true
|
||||
|
||||
@@ -60,7 +60,7 @@ use compute_tools::compute::{
|
||||
};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::get_pg_version_string;
|
||||
use compute_tools::http::launch_http_server;
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
@@ -68,6 +68,7 @@ use compute_tools::spec::*;
|
||||
use compute_tools::swap::resize_swap;
|
||||
use rlimit::{setrlimit, Resource};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
// this is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
@@ -87,9 +88,9 @@ fn main() -> Result<()> {
|
||||
|
||||
let cli_args = process_cli(&clap_args)?;
|
||||
|
||||
let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
|
||||
// let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
|
||||
|
||||
let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?;
|
||||
let wait_spec_result = wait_spec(build_tag, cli_args)?;
|
||||
|
||||
start_postgres(&clap_args, wait_spec_result)?
|
||||
|
||||
@@ -111,6 +112,11 @@ fn main() -> Result<()> {
|
||||
fn init() -> Result<(String, clap::ArgMatches)> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
opentelemetry::global::set_error_handler(|err| {
|
||||
tracing::info!("OpenTelemetry error: {err}");
|
||||
})
|
||||
.expect("global error handler lock poisoned");
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
thread::spawn(move || {
|
||||
for sig in signals.forever() {
|
||||
@@ -308,14 +314,41 @@ fn wait_spec(
|
||||
http_port,
|
||||
..
|
||||
}: ProcessCliResult,
|
||||
CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed,
|
||||
}: CliSpecParams,
|
||||
) -> Result<WaitSpecResult> {
|
||||
let mut new_state = ComputeState::new();
|
||||
let spec_set;
|
||||
|
||||
let live_config_allowed = true;
|
||||
|
||||
let spec = Some(ComputeSpec {
|
||||
// format_version: todo!(),
|
||||
// operation_uuid: todo!(),
|
||||
// features: todo!(),
|
||||
// swap_size_bytes: todo!(),
|
||||
// disk_quota_bytes: todo!(),
|
||||
// disable_lfc_resizing: todo!(),
|
||||
// cluster: todo!(),
|
||||
// delta_operations: todo!(),
|
||||
// skip_pg_catalog_updates: todo!(),
|
||||
// tenant_id: todo!(),
|
||||
// timeline_id: todo!(),
|
||||
// pageserver_connstring: todo!(),
|
||||
// safekeeper_connstrings: todo!(),
|
||||
// mode: todo!(),
|
||||
// storage_auth_token: todo!(),
|
||||
// remote_extensions: todo!(),
|
||||
// pgbouncer_settings: todo!(),
|
||||
// shard_stripe_size: todo!(),
|
||||
// local_proxy_config: todo!(),
|
||||
// reconfigure_concurrency: todo!(),
|
||||
pageserver_connstring: Some("pageserver-1.example.com:5432".to_string()),
|
||||
safekeeper_connstrings: vec!["safekeeper-1.example.com:5432".to_string()],
|
||||
tenant_id: Some(TenantId::generate()),
|
||||
timeline_id: Some(TimelineId::generate()),
|
||||
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
if let Some(spec) = spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
info!("new pspec.spec: {:?}", pspec.spec);
|
||||
@@ -350,9 +383,7 @@ fn wait_spec(
|
||||
// available for binding. Prewarming helps Postgres start quicker later,
|
||||
// because QEMU will already have its memory allocated from the host, and
|
||||
// the necessary binaries will already be cached.
|
||||
if !spec_set {
|
||||
compute.prewarm_postgres()?;
|
||||
}
|
||||
compute.prewarm_postgres()?;
|
||||
|
||||
// Launch http service first, so that we can serve control-plane requests
|
||||
// while configuration is still in progress.
|
||||
@@ -488,10 +519,7 @@ fn start_postgres(
|
||||
let mut pg = None;
|
||||
if !prestartup_failed {
|
||||
pg = match compute.start_compute() {
|
||||
Ok(pg) => {
|
||||
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
|
||||
Some(pg)
|
||||
}
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:#}", err);
|
||||
compute.set_failed_status(err);
|
||||
@@ -589,8 +617,6 @@ fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
|
||||
// propagate to Postgres and it will be shut down as well.
|
||||
let mut exit_code = None;
|
||||
if let Some((mut pg, logs_handle)) = pg {
|
||||
info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit");
|
||||
|
||||
let ecode = pg
|
||||
.wait()
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//!
|
||||
//! # Local Testing
|
||||
//!
|
||||
//! - Comment out most of the pgxns in compute-node.Dockerfile to speed up the build.
|
||||
//! - Comment out most of the pgxns in The Dockerfile.compute-tools to speed up the build.
|
||||
//! - Build the image with the following command:
|
||||
//!
|
||||
//! ```bash
|
||||
|
||||
@@ -36,11 +36,11 @@ pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<Cat
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SchemaDumpError {
|
||||
#[error("database does not exist")]
|
||||
#[error("Database does not exist.")]
|
||||
DatabaseDoesNotExist,
|
||||
#[error("failed to execute pg_dump")]
|
||||
#[error("Failed to execute pg_dump.")]
|
||||
IO(#[from] std::io::Error),
|
||||
#[error("unexpected I/O error")]
|
||||
#[error("Unexpected error.")]
|
||||
Unexpected,
|
||||
}
|
||||
|
||||
|
||||
@@ -358,64 +358,22 @@ impl ComputeNode {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let start_time = Instant::now();
|
||||
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
let mut config = postgres::Config::from_str(shard0_connstr)?;
|
||||
|
||||
// Use the storage auth token from the config file, if given.
|
||||
// Note: this overrides any password set in the connection string.
|
||||
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
||||
info!("Got storage auth token from spec file");
|
||||
config.password(storage_auth_token);
|
||||
} else {
|
||||
info!("Storage auth token not set");
|
||||
}
|
||||
|
||||
// Connect to pageserver
|
||||
let mut client = config.connect(NoTls)?;
|
||||
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
|
||||
|
||||
let basebackup_cmd = match lsn {
|
||||
Lsn(0) => {
|
||||
if spec.spec.mode != ComputeMode::Primary {
|
||||
format!(
|
||||
"basebackup {} {} --gzip --replica",
|
||||
spec.tenant_id, spec.timeline_id
|
||||
)
|
||||
} else {
|
||||
format!("basebackup {} {} --gzip", spec.tenant_id, spec.timeline_id)
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
if spec.spec.mode != ComputeMode::Primary {
|
||||
format!(
|
||||
"basebackup {} {} {} --gzip --replica",
|
||||
spec.tenant_id, spec.timeline_id, lsn
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"basebackup {} {} {} --gzip",
|
||||
spec.tenant_id, spec.timeline_id, lsn
|
||||
)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
||||
let mut measured_reader = MeasuredReader::new(copyreader);
|
||||
// Open backup file directly
|
||||
let backup_file = std::fs::File::open("/var/db/backups/backup.tar.gz")?;
|
||||
let mut measured_reader = MeasuredReader::new(backup_file);
|
||||
let mut bufreader = std::io::BufReader::new(&mut measured_reader);
|
||||
|
||||
// Read the archive directly from the `CopyOutReader`
|
||||
// Read the archive directly from the file
|
||||
//
|
||||
// Set `ignore_zeros` so that unpack() reads all the Copy data and
|
||||
// doesn't stop at the end-of-archive marker. Otherwise, if the server
|
||||
// sends an Error after finishing the tarball, we will not notice it.
|
||||
// doesn't stop at the end-of-archive marker.
|
||||
let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
|
||||
ar.set_ignore_zeros(true);
|
||||
ar.unpack(&self.pgdata)?;
|
||||
|
||||
// Report metrics
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
|
||||
state.metrics.pageserver_connect_micros = 0;
|
||||
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
|
||||
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
|
||||
Ok(())
|
||||
@@ -628,32 +586,7 @@ impl ComputeNode {
|
||||
self.http_port,
|
||||
)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
// is already connected it will be kicked out, so a secondary (standby)
|
||||
// cannot sync safekeepers.
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("checking if safekeepers are synced");
|
||||
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
|
||||
lsn
|
||||
} else {
|
||||
info!("starting safekeepers syncing");
|
||||
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?
|
||||
};
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
ComputeMode::Static(lsn) => {
|
||||
info!("Starting read-only node at static LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
ComputeMode::Replica => {
|
||||
info!("Initializing standby from latest Pageserver LSN");
|
||||
Lsn(0)
|
||||
}
|
||||
};
|
||||
|
||||
let lsn = Lsn(0);
|
||||
info!(
|
||||
"getting basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
|
||||
606
compute_tools/src/http/api.rs
Normal file
606
compute_tools/src/http/api.rs
Normal file
@@ -0,0 +1,606 @@
|
||||
use std::convert::Infallible;
|
||||
use std::net::IpAddr;
|
||||
use std::net::Ipv6Addr;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use crate::catalog::SchemaDumpError;
|
||||
use crate::catalog::{get_database_schema, get_dbs_and_roles};
|
||||
use crate::compute::forward_termination_signal;
|
||||
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use crate::installed_extensions;
|
||||
use compute_api::requests::{ConfigurationRequest, ExtensionInstallRequest, SetRoleGrantsRequest};
|
||||
use compute_api::responses::{
|
||||
ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError,
|
||||
SetRoleGrantsResponse,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::Encoder;
|
||||
use metrics::TextEncoder;
|
||||
use tokio::task;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::http::request::must_get_query_param;
|
||||
|
||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
tenant: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|pspec| pspec.tenant_id.to_string()),
|
||||
timeline: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|pspec| pspec.timeline_id.to_string()),
|
||||
status: state.status,
|
||||
last_active: state.last_active,
|
||||
error: state.error.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
// Service function to handle all available routes.
|
||||
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
|
||||
//
|
||||
// NOTE: The URI path is currently included in traces. That's OK because
|
||||
// it doesn't contain any variable parts or sensitive information. But
|
||||
// please keep that in mind if you change the routing here.
|
||||
//
|
||||
match (req.method(), req.uri().path()) {
|
||||
// Serialized compute state.
|
||||
(&Method::GET, "/status") => {
|
||||
debug!("serving /status GET request");
|
||||
let state = compute.state.lock().unwrap();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
|
||||
}
|
||||
|
||||
// Startup metrics in JSON format. Keep /metrics reserved for a possible
|
||||
// future use for Prometheus metrics format.
|
||||
(&Method::GET, "/metrics.json") => {
|
||||
info!("serving /metrics.json GET request");
|
||||
let metrics = compute.state.lock().unwrap().metrics.clone();
|
||||
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
|
||||
}
|
||||
|
||||
// Prometheus metrics
|
||||
(&Method::GET, "/metrics") => {
|
||||
debug!("serving /metrics GET request");
|
||||
|
||||
// When we call TextEncoder::encode() below, it will immediately
|
||||
// return an error if a metric family has no metrics, so we need to
|
||||
// preemptively filter out metric families with no metrics.
|
||||
let metrics = installed_extensions::collect()
|
||||
.into_iter()
|
||||
.filter(|m| !m.get_metric().is_empty())
|
||||
.collect::<Vec<MetricFamily>>();
|
||||
|
||||
let encoder = TextEncoder::new();
|
||||
let mut buffer = vec![];
|
||||
|
||||
if let Err(err) = encoder.encode(&metrics, &mut buffer) {
|
||||
let msg = format!("error handling /metrics request: {err}");
|
||||
error!(msg);
|
||||
return render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
|
||||
match Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
let msg = format!("error handling /metrics request: {err}");
|
||||
error!(msg);
|
||||
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Collect Postgres current usage insights
|
||||
(&Method::GET, "/insights") => {
|
||||
info!("serving /insights GET request");
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
let msg = format!("compute is not running, current status: {:?}", status);
|
||||
error!(msg);
|
||||
return Response::new(Body::from(msg));
|
||||
}
|
||||
|
||||
let insights = compute.collect_insights().await;
|
||||
Response::new(Body::from(insights))
|
||||
}
|
||||
|
||||
(&Method::POST, "/check_writability") => {
|
||||
info!("serving /check_writability POST request");
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for check_writability request: {:?}",
|
||||
status
|
||||
);
|
||||
error!(msg);
|
||||
return Response::new(Body::from(msg));
|
||||
}
|
||||
|
||||
let res = crate::checker::check_writability(compute).await;
|
||||
match res {
|
||||
Ok(_) => Response::new(Body::from("true")),
|
||||
Err(e) => {
|
||||
error!("check_writability failed: {}", e);
|
||||
Response::new(Body::from(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::POST, "/extensions") => {
|
||||
info!("serving /extensions POST request");
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for extensions request: {:?}",
|
||||
status
|
||||
);
|
||||
error!(msg);
|
||||
return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
|
||||
}
|
||||
|
||||
let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||
let request = serde_json::from_slice::<ExtensionInstallRequest>(&request).unwrap();
|
||||
let res = compute
|
||||
.install_extension(&request.extension, &request.database, request.version)
|
||||
.await;
|
||||
match res {
|
||||
Ok(version) => render_json(Body::from(
|
||||
serde_json::to_string(&ExtensionInstallResult {
|
||||
extension: request.extension,
|
||||
version,
|
||||
})
|
||||
.unwrap(),
|
||||
)),
|
||||
Err(e) => {
|
||||
error!("install_extension failed: {}", e);
|
||||
render_json_error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::GET, "/info") => {
|
||||
let num_cpus = num_cpus::get_physical();
|
||||
info!("serving /info GET request. num_cpus: {}", num_cpus);
|
||||
Response::new(Body::from(
|
||||
serde_json::json!({
|
||||
"num_cpus": num_cpus,
|
||||
})
|
||||
.to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
// Accept spec in JSON format and request compute configuration. If
|
||||
// anything goes wrong after we set the compute status to `ConfigurationPending`
|
||||
// and update compute state with new spec, we basically leave compute
|
||||
// in the potentially wrong state. That said, it's control-plane's
|
||||
// responsibility to watch compute state after reconfiguration request
|
||||
// and to clean restart in case of errors.
|
||||
(&Method::POST, "/configure") => {
|
||||
info!("serving /configure POST request");
|
||||
match handle_configure_request(req, compute).await {
|
||||
Ok(msg) => Response::new(Body::from(msg)),
|
||||
Err((msg, code)) => {
|
||||
error!("error handling /configure request: {msg}");
|
||||
render_json_error(&msg, code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::POST, "/terminate") => {
|
||||
info!("serving /terminate POST request");
|
||||
match handle_terminate_request(compute).await {
|
||||
Ok(()) => Response::new(Body::empty()),
|
||||
Err((msg, code)) => {
|
||||
error!("error handling /terminate request: {msg}");
|
||||
render_json_error(&msg, code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::GET, "/dbs_and_roles") => {
|
||||
info!("serving /dbs_and_roles GET request",);
|
||||
match get_dbs_and_roles(compute).await {
|
||||
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
|
||||
Err(_) => {
|
||||
render_json_error("can't get dbs and roles", StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::GET, "/database_schema") => {
|
||||
let database = match must_get_query_param(&req, "database") {
|
||||
Err(e) => return e.into_response(),
|
||||
Ok(database) => database,
|
||||
};
|
||||
info!("serving /database_schema GET request with database: {database}",);
|
||||
match get_database_schema(compute, &database).await {
|
||||
Ok(res) => render_plain(Body::wrap_stream(res)),
|
||||
Err(SchemaDumpError::DatabaseDoesNotExist) => {
|
||||
render_json_error("database does not exist", StatusCode::NOT_FOUND)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("can't get schema dump: {}", e);
|
||||
render_json_error("can't get schema dump", StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::POST, "/grants") => {
|
||||
info!("serving /grants POST request");
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for set_role_grants request: {:?}",
|
||||
status
|
||||
);
|
||||
error!(msg);
|
||||
return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
|
||||
}
|
||||
|
||||
let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||
let request = serde_json::from_slice::<SetRoleGrantsRequest>(&request).unwrap();
|
||||
|
||||
let res = compute
|
||||
.set_role_grants(
|
||||
&request.database,
|
||||
&request.schema,
|
||||
&request.privileges,
|
||||
&request.role,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => render_json(Body::from(
|
||||
serde_json::to_string(&SetRoleGrantsResponse {
|
||||
database: request.database,
|
||||
schema: request.schema,
|
||||
role: request.role,
|
||||
privileges: request.privileges,
|
||||
})
|
||||
.unwrap(),
|
||||
)),
|
||||
Err(e) => render_json_error(
|
||||
&format!("could not grant role privileges to the schema: {e}"),
|
||||
// TODO: can we filter on role/schema not found errors
|
||||
// and return appropriate error code?
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// get the list of installed extensions
|
||||
// currently only used in python tests
|
||||
// TODO: call it from cplane
|
||||
(&Method::GET, "/installed_extensions") => {
|
||||
info!("serving /installed_extensions GET request");
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for extensions request: {:?}",
|
||||
status
|
||||
);
|
||||
error!(msg);
|
||||
return Response::new(Body::from(msg));
|
||||
}
|
||||
|
||||
let conf = compute.get_conn_conf(None);
|
||||
let res =
|
||||
task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
match res {
|
||||
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
|
||||
Err(e) => render_json_error(
|
||||
&format!("could not get list of installed extensions: {}", e),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
|
||||
match failpoints_handler(req, CancellationToken::new()).await {
|
||||
Ok(r) => r,
|
||||
Err(ApiError::BadRequest(e)) => {
|
||||
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
|
||||
}
|
||||
Err(_) => {
|
||||
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from remote extension storage on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
info!("req.uri {:?}", req.uri());
|
||||
|
||||
// don't even try to download extensions
|
||||
// if no remote storage is configured
|
||||
if compute.ext_remote_storage.is_none() {
|
||||
info!("no extensions remote storage configured");
|
||||
let mut resp = Response::new(Body::from("no remote storage configured"));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return resp;
|
||||
}
|
||||
|
||||
let mut is_library = false;
|
||||
if let Some(params) = req.uri().query() {
|
||||
info!("serving {:?} POST request with params: {}", route, params);
|
||||
if params == "is_library=true" {
|
||||
is_library = true;
|
||||
} else {
|
||||
let mut resp = Response::new(Body::from("Wrong request parameters"));
|
||||
*resp.status_mut() = StatusCode::BAD_REQUEST;
|
||||
return resp;
|
||||
}
|
||||
}
|
||||
let filename = route.split('/').last().unwrap().to_string();
|
||||
info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
|
||||
|
||||
// get ext_name and path from spec
|
||||
// don't lock compute_state for too long
|
||||
let ext = {
|
||||
let compute_state = compute.state.lock().unwrap();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
|
||||
// debug only
|
||||
info!("spec: {:?}", spec);
|
||||
|
||||
let remote_extensions = match spec.remote_extensions.as_ref() {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
info!("no remote extensions spec was provided");
|
||||
let mut resp = Response::new(Body::from("no remote storage configured"));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return resp;
|
||||
}
|
||||
};
|
||||
|
||||
remote_extensions.get_ext(
|
||||
&filename,
|
||||
is_library,
|
||||
&compute.build_tag,
|
||||
&compute.pgversion,
|
||||
)
|
||||
};
|
||||
|
||||
match ext {
|
||||
Ok((ext_name, ext_path)) => {
|
||||
match compute.download_extension(ext_name, ext_path).await {
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
error!("extension download failed: {}", e);
|
||||
let mut resp = Response::new(Body::from(e.to_string()));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("extension download failed to find extension: {}", e);
|
||||
let mut resp = Response::new(Body::from("failed to find file"));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return the `404 Not Found` for any other routes.
|
||||
_ => {
|
||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
||||
not_found
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_configure_request(
|
||||
req: Request<Body>,
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<String, (String, StatusCode)> {
|
||||
if !compute.live_config_allowed {
|
||||
return Err((
|
||||
"live configuration is not allowed for this compute node".to_string(),
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
));
|
||||
}
|
||||
|
||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
|
||||
let spec = request.spec;
|
||||
|
||||
let parsed_spec = match ParsedSpec::try_from(spec) {
|
||||
Ok(ps) => ps,
|
||||
Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
|
||||
};
|
||||
|
||||
// XXX: wrap state update under lock in code blocks. Otherwise,
|
||||
// we will try to `Send` `mut state` into the spawned thread
|
||||
// bellow, which will cause error:
|
||||
// ```
|
||||
// error: future cannot be sent between threads safely
|
||||
// ```
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for configuration request: {:?}",
|
||||
state.status.clone()
|
||||
);
|
||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||
}
|
||||
state.pspec = Some(parsed_spec);
|
||||
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
|
||||
drop(state);
|
||||
info!("set new spec and notified waiters");
|
||||
}
|
||||
|
||||
// Spawn a blocking thread to wait for compute to become Running.
|
||||
// This is needed to do not block the main pool of workers and
|
||||
// be able to serve other requests while some particular request
|
||||
// is waiting for compute to finish configuration.
|
||||
let c = compute.clone();
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become Running, current status: {:?}",
|
||||
state.status
|
||||
);
|
||||
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
// Return current compute state if everything went well.
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Ok(serde_json::to_string(&status_response).unwrap())
|
||||
} else {
|
||||
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
|
||||
}
|
||||
}
|
||||
|
||||
fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
|
||||
let error = GenericAPIError {
|
||||
error: e.to_string(),
|
||||
};
|
||||
Response::builder()
|
||||
.status(status)
|
||||
.header(CONTENT_TYPE, "application/json")
|
||||
.body(Body::from(serde_json::to_string(&error).unwrap()))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn render_json(body: Body) -> Response<Body> {
|
||||
Response::builder()
|
||||
.header(CONTENT_TYPE, "application/json")
|
||||
.body(body)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn render_plain(body: Body) -> Response<Body> {
|
||||
Response::builder()
|
||||
.header(CONTENT_TYPE, "text/plain")
|
||||
.body(body)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status == ComputeStatus::Terminated {
|
||||
return Ok(());
|
||||
}
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for termination request: {}",
|
||||
state.status
|
||||
);
|
||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||
}
|
||||
state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
|
||||
drop(state);
|
||||
}
|
||||
|
||||
forward_termination_signal();
|
||||
info!("sent signal and notified waiters");
|
||||
|
||||
// Spawn a blocking thread to wait for compute to become Terminated.
|
||||
// This is needed to do not block the main pool of workers and
|
||||
// be able to serve other requests while some particular request
|
||||
// is waiting for compute to finish configuration.
|
||||
let c = compute.clone();
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Terminated {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become {}, current status: {:?}",
|
||||
ComputeStatus::Terminated,
|
||||
state.status
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
info!("terminated Postgres");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
|
||||
#[tokio::main]
|
||||
async fn serve(port: u16, state: Arc<ComputeNode>) {
|
||||
// this usually binds to both IPv4 and IPv6 on linux
|
||||
// see e.g. https://github.com/rust-lang/rust/pull/34440
|
||||
let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
|
||||
|
||||
let make_service = make_service_fn(move |_conn| {
|
||||
let state = state.clone();
|
||||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
|
||||
let state = state.clone();
|
||||
async move {
|
||||
Ok::<_, Infallible>(
|
||||
// NOTE: We include the URI path in the string. It
|
||||
// doesn't contain any variable parts or sensitive
|
||||
// information in this API.
|
||||
tracing_utils::http::tracing_handler(
|
||||
req,
|
||||
|req| routes(req, &state),
|
||||
OtelName::UriPath,
|
||||
)
|
||||
.await,
|
||||
)
|
||||
}
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
info!("starting HTTP server on {}", addr);
|
||||
|
||||
let server = Server::bind(&addr).serve(make_service);
|
||||
|
||||
// Run this server forever
|
||||
if let Err(e) = server.await {
|
||||
error!("server error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
|
||||
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
let state = Arc::clone(state);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
.name("http-endpoint".into())
|
||||
.spawn(move || serve(port, state))?)
|
||||
}
|
||||
@@ -1,48 +0,0 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use axum::{
|
||||
async_trait,
|
||||
extract::{rejection::JsonRejection, FromRequest, Request},
|
||||
};
|
||||
use compute_api::responses::GenericAPIError;
|
||||
use http::StatusCode;
|
||||
|
||||
/// Custom `Json` extractor, so that we can format errors into
|
||||
/// `JsonResponse<GenericAPIError>`.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub(crate) struct Json<T>(pub T);
|
||||
|
||||
#[async_trait]
|
||||
impl<S, T> FromRequest<S> for Json<T>
|
||||
where
|
||||
axum::Json<T>: FromRequest<S, Rejection = JsonRejection>,
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = (StatusCode, axum::Json<GenericAPIError>);
|
||||
|
||||
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
|
||||
match axum::Json::<T>::from_request(req, state).await {
|
||||
Ok(value) => Ok(Self(value.0)),
|
||||
Err(rejection) => Err((
|
||||
rejection.status(),
|
||||
axum::Json(GenericAPIError {
|
||||
error: rejection.body_text().to_lowercase(),
|
||||
}),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for Json<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for Json<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
pub(crate) mod json;
|
||||
pub(crate) mod path;
|
||||
pub(crate) mod query;
|
||||
|
||||
pub(crate) use json::Json;
|
||||
pub(crate) use path::Path;
|
||||
pub(crate) use query::Query;
|
||||
@@ -1,48 +0,0 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use axum::{
|
||||
async_trait,
|
||||
extract::{rejection::PathRejection, FromRequestParts},
|
||||
};
|
||||
use compute_api::responses::GenericAPIError;
|
||||
use http::{request::Parts, StatusCode};
|
||||
|
||||
/// Custom `Path` extractor, so that we can format errors into
|
||||
/// `JsonResponse<GenericAPIError>`.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub(crate) struct Path<T>(pub T);
|
||||
|
||||
#[async_trait]
|
||||
impl<S, T> FromRequestParts<S> for Path<T>
|
||||
where
|
||||
axum::extract::Path<T>: FromRequestParts<S, Rejection = PathRejection>,
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = (StatusCode, axum::Json<GenericAPIError>);
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
|
||||
match axum::extract::Path::<T>::from_request_parts(parts, state).await {
|
||||
Ok(value) => Ok(Self(value.0)),
|
||||
Err(rejection) => Err((
|
||||
rejection.status(),
|
||||
axum::Json(GenericAPIError {
|
||||
error: rejection.body_text().to_ascii_lowercase(),
|
||||
}),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for Path<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for Path<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
@@ -1,48 +0,0 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use axum::{
|
||||
async_trait,
|
||||
extract::{rejection::QueryRejection, FromRequestParts},
|
||||
};
|
||||
use compute_api::responses::GenericAPIError;
|
||||
use http::{request::Parts, StatusCode};
|
||||
|
||||
/// Custom `Query` extractor, so that we can format errors into
|
||||
/// `JsonResponse<GenericAPIError>`.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub(crate) struct Query<T>(pub T);
|
||||
|
||||
#[async_trait]
|
||||
impl<S, T> FromRequestParts<S> for Query<T>
|
||||
where
|
||||
axum::extract::Query<T>: FromRequestParts<S, Rejection = QueryRejection>,
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = (StatusCode, axum::Json<GenericAPIError>);
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
|
||||
match axum::extract::Query::<T>::from_request_parts(parts, state).await {
|
||||
Ok(value) => Ok(Self(value.0)),
|
||||
Err(rejection) => Err((
|
||||
rejection.status(),
|
||||
axum::Json(GenericAPIError {
|
||||
error: rejection.body_text().to_ascii_lowercase(),
|
||||
}),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for Query<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for Query<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
@@ -1,56 +1 @@
|
||||
use axum::{body::Body, response::Response};
|
||||
use compute_api::responses::{ComputeStatus, GenericAPIError};
|
||||
use http::{header::CONTENT_TYPE, StatusCode};
|
||||
use serde::Serialize;
|
||||
use tracing::error;
|
||||
|
||||
pub use server::launch_http_server;
|
||||
|
||||
mod extract;
|
||||
mod routes;
|
||||
mod server;
|
||||
|
||||
/// Convenience response builder for JSON responses
|
||||
struct JsonResponse;
|
||||
|
||||
impl JsonResponse {
|
||||
/// Helper for actually creating a response
|
||||
fn create_response(code: StatusCode, body: impl Serialize) -> Response {
|
||||
Response::builder()
|
||||
.status(code)
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.body(Body::from(serde_json::to_string(&body).unwrap()))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Create a successful error response
|
||||
pub(self) fn success(code: StatusCode, body: impl Serialize) -> Response {
|
||||
assert!({
|
||||
let code = code.as_u16();
|
||||
|
||||
(200..300).contains(&code)
|
||||
});
|
||||
|
||||
Self::create_response(code, body)
|
||||
}
|
||||
|
||||
/// Create an error response
|
||||
pub(self) fn error(code: StatusCode, error: impl ToString) -> Response {
|
||||
assert!(code.as_u16() >= 400);
|
||||
|
||||
let message = error.to_string();
|
||||
error!(message);
|
||||
|
||||
Self::create_response(code, &GenericAPIError { error: message })
|
||||
}
|
||||
|
||||
/// Create an error response related to the compute being in an invalid state
|
||||
pub(self) fn invalid_status(status: ComputeStatus) -> Response {
|
||||
Self::create_response(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
&GenericAPIError {
|
||||
error: format!("invalid compute status: {status}"),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
pub mod api;
|
||||
|
||||
@@ -37,7 +37,7 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ComputeMetrics"
|
||||
|
||||
/metrics:
|
||||
/metrics
|
||||
get:
|
||||
tags:
|
||||
- Info
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use http::StatusCode;
|
||||
|
||||
use crate::{checker::check_writability, compute::ComputeNode, http::JsonResponse};
|
||||
|
||||
/// Check that the compute is currently running.
|
||||
pub(in crate::http) async fn is_writable(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
return JsonResponse::invalid_status(status);
|
||||
}
|
||||
|
||||
match check_writability(&compute).await {
|
||||
Ok(_) => JsonResponse::success(StatusCode::OK, true),
|
||||
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||
}
|
||||
}
|
||||
@@ -1,91 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use compute_api::{
|
||||
requests::ConfigurationRequest,
|
||||
responses::{ComputeStatus, ComputeStatusResponse},
|
||||
};
|
||||
use http::StatusCode;
|
||||
use tokio::task;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
compute::{ComputeNode, ParsedSpec},
|
||||
http::{extract::Json, JsonResponse},
|
||||
};
|
||||
|
||||
// Accept spec in JSON format and request compute configuration. If anything
|
||||
// goes wrong after we set the compute status to `ConfigurationPending` and
|
||||
// update compute state with new spec, we basically leave compute in the
|
||||
// potentially wrong state. That said, it's control-plane's responsibility to
|
||||
// watch compute state after reconfiguration request and to clean restart in
|
||||
// case of errors.
|
||||
pub(in crate::http) async fn configure(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
request: Json<ConfigurationRequest>,
|
||||
) -> Response {
|
||||
if !compute.live_config_allowed {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"live configuration is not allowed for this compute node".to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
let pspec = match ParsedSpec::try_from(request.spec.clone()) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
|
||||
};
|
||||
|
||||
// XXX: wrap state update under lock in a code block. Otherwise, we will try
|
||||
// to `Send` `mut state` into the spawned thread bellow, which will cause
|
||||
// the following rustc error:
|
||||
//
|
||||
// error: future cannot be sent between threads safely
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
|
||||
return JsonResponse::invalid_status(state.status);
|
||||
}
|
||||
|
||||
state.pspec = Some(pspec);
|
||||
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
|
||||
drop(state);
|
||||
}
|
||||
|
||||
// Spawn a blocking thread to wait for compute to become Running. This is
|
||||
// needed to do not block the main pool of workers and be able to serve
|
||||
// other requests while some particular request is waiting for compute to
|
||||
// finish configuration.
|
||||
let c = compute.clone();
|
||||
let completed = task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become {}, current status: {}",
|
||||
ComputeStatus::Running,
|
||||
state.status
|
||||
);
|
||||
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
return Err(msg);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if let Err(e) = completed {
|
||||
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
|
||||
}
|
||||
|
||||
// Return current compute state if everything went well.
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let body = ComputeStatusResponse::from(&state);
|
||||
|
||||
JsonResponse::success(StatusCode::OK, body)
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{body::Body, extract::State, response::Response};
|
||||
use http::{header::CONTENT_TYPE, StatusCode};
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{
|
||||
catalog::{get_database_schema, SchemaDumpError},
|
||||
compute::ComputeNode,
|
||||
http::{extract::Query, JsonResponse},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub(in crate::http) struct DatabaseSchemaParams {
|
||||
database: String,
|
||||
}
|
||||
|
||||
/// Get a schema dump of the requested database.
|
||||
pub(in crate::http) async fn get_schema_dump(
|
||||
params: Query<DatabaseSchemaParams>,
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
match get_database_schema(&compute, ¶ms.database).await {
|
||||
Ok(schema) => Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.body(Body::from_stream(schema))
|
||||
.unwrap(),
|
||||
Err(SchemaDumpError::DatabaseDoesNotExist) => {
|
||||
JsonResponse::error(StatusCode::NOT_FOUND, SchemaDumpError::DatabaseDoesNotExist)
|
||||
}
|
||||
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||
}
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use http::StatusCode;
|
||||
|
||||
use crate::{catalog::get_dbs_and_roles, compute::ComputeNode, http::JsonResponse};
|
||||
|
||||
/// Get the databases and roles from the compute.
|
||||
pub(in crate::http) async fn get_catalog_objects(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
match get_dbs_and_roles(&compute).await {
|
||||
Ok(catalog_objects) => JsonResponse::success(StatusCode::OK, catalog_objects),
|
||||
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||
}
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{
|
||||
extract::State,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use http::StatusCode;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{
|
||||
compute::ComputeNode,
|
||||
http::{
|
||||
extract::{Path, Query},
|
||||
JsonResponse,
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub(in crate::http) struct ExtensionServerParams {
|
||||
is_library: Option<bool>,
|
||||
}
|
||||
|
||||
/// Download a remote extension.
|
||||
pub(in crate::http) async fn download_extension(
|
||||
Path(filename): Path<String>,
|
||||
params: Query<ExtensionServerParams>,
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
// Don't even try to download extensions if no remote storage is configured
|
||||
if compute.ext_remote_storage.is_none() {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"remote storage is not configured",
|
||||
);
|
||||
}
|
||||
|
||||
let ext = {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let pspec = state.pspec.as_ref().unwrap();
|
||||
let spec = &pspec.spec;
|
||||
|
||||
let remote_extensions = match spec.remote_extensions.as_ref() {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
return JsonResponse::error(
|
||||
StatusCode::CONFLICT,
|
||||
"information about remote extensions is unavailable",
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
remote_extensions.get_ext(
|
||||
&filename,
|
||||
params.is_library.unwrap_or(false),
|
||||
&compute.build_tag,
|
||||
&compute.pgversion,
|
||||
)
|
||||
};
|
||||
|
||||
match ext {
|
||||
Ok((ext_name, ext_path)) => match compute.download_extension(ext_name, ext_path).await {
|
||||
Ok(_) => StatusCode::OK.into_response(),
|
||||
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||
},
|
||||
Err(e) => JsonResponse::error(StatusCode::NOT_FOUND, e),
|
||||
}
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use compute_api::{
|
||||
requests::ExtensionInstallRequest,
|
||||
responses::{ComputeStatus, ExtensionInstallResponse},
|
||||
};
|
||||
use http::StatusCode;
|
||||
|
||||
use crate::{
|
||||
compute::ComputeNode,
|
||||
http::{extract::Json, JsonResponse},
|
||||
};
|
||||
|
||||
/// Install a extension.
|
||||
pub(in crate::http) async fn install_extension(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
request: Json<ExtensionInstallRequest>,
|
||||
) -> Response {
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
return JsonResponse::invalid_status(status);
|
||||
}
|
||||
|
||||
match compute
|
||||
.install_extension(
|
||||
&request.extension,
|
||||
&request.database,
|
||||
request.version.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(version) => JsonResponse::success(
|
||||
StatusCode::CREATED,
|
||||
Some(ExtensionInstallResponse {
|
||||
extension: request.extension.clone(),
|
||||
version,
|
||||
}),
|
||||
),
|
||||
Err(e) => JsonResponse::error(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to install extension: {e}"),
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use http::StatusCode;
|
||||
use tracing::info;
|
||||
use utils::failpoint_support::{apply_failpoint, ConfigureFailpointsRequest};
|
||||
|
||||
use crate::http::{extract::Json, JsonResponse};
|
||||
|
||||
/// Configure failpoints for testing purposes.
|
||||
pub(in crate::http) async fn configure_failpoints(
|
||||
failpoints: Json<ConfigureFailpointsRequest>,
|
||||
) -> Response {
|
||||
if !fail::has_failpoints() {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"Cannot manage failpoints because neon was compiled without failpoints support",
|
||||
);
|
||||
}
|
||||
|
||||
for fp in &*failpoints {
|
||||
info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||
|
||||
// We recognize one extra "action" that's not natively recognized
|
||||
// by the failpoints crate: exit, to immediately kill the process
|
||||
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
|
||||
|
||||
if let Err(e) = cfg_result {
|
||||
return JsonResponse::error(
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("failed to configure failpoints: {e}"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
StatusCode::OK.into_response()
|
||||
}
|
||||
@@ -1,48 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use compute_api::{
|
||||
requests::SetRoleGrantsRequest,
|
||||
responses::{ComputeStatus, SetRoleGrantsResponse},
|
||||
};
|
||||
use http::StatusCode;
|
||||
|
||||
use crate::{
|
||||
compute::ComputeNode,
|
||||
http::{extract::Json, JsonResponse},
|
||||
};
|
||||
|
||||
/// Add grants for a role.
|
||||
pub(in crate::http) async fn add_grant(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
request: Json<SetRoleGrantsRequest>,
|
||||
) -> Response {
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
return JsonResponse::invalid_status(status);
|
||||
}
|
||||
|
||||
match compute
|
||||
.set_role_grants(
|
||||
&request.database,
|
||||
&request.schema,
|
||||
&request.privileges,
|
||||
&request.role,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => JsonResponse::success(
|
||||
StatusCode::CREATED,
|
||||
Some(SetRoleGrantsResponse {
|
||||
database: request.database.clone(),
|
||||
schema: request.schema.clone(),
|
||||
role: request.role.clone(),
|
||||
privileges: request.privileges.clone(),
|
||||
}),
|
||||
),
|
||||
Err(e) => JsonResponse::error(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to grant role privileges to the schema: {e}"),
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
use axum::response::Response;
|
||||
use compute_api::responses::InfoResponse;
|
||||
use http::StatusCode;
|
||||
|
||||
use crate::http::JsonResponse;
|
||||
|
||||
/// Get information about the physical characteristics about the compute.
|
||||
pub(in crate::http) async fn get_info() -> Response {
|
||||
let num_cpus = num_cpus::get_physical();
|
||||
JsonResponse::success(StatusCode::OK, &InfoResponse { num_cpus })
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use http::StatusCode;
|
||||
|
||||
use crate::{compute::ComputeNode, http::JsonResponse};
|
||||
|
||||
/// Collect current Postgres usage insights.
|
||||
pub(in crate::http) async fn get_insights(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
return JsonResponse::invalid_status(status);
|
||||
}
|
||||
|
||||
let insights = compute.collect_insights().await;
|
||||
JsonResponse::success(StatusCode::OK, insights)
|
||||
}
|
||||
@@ -1,33 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use http::StatusCode;
|
||||
use tokio::task;
|
||||
|
||||
use crate::{compute::ComputeNode, http::JsonResponse, installed_extensions};
|
||||
|
||||
/// Get a list of installed extensions.
|
||||
pub(in crate::http) async fn get_installed_extensions(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Response {
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
return JsonResponse::invalid_status(status);
|
||||
}
|
||||
|
||||
let conf = compute.get_conn_conf(None);
|
||||
let res = task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
match res {
|
||||
Ok(installed_extensions) => {
|
||||
JsonResponse::success(StatusCode::OK, Some(installed_extensions))
|
||||
}
|
||||
Err(e) => JsonResponse::error(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to get list of installed extensions: {e}"),
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
use axum::{body::Body, response::Response};
|
||||
use http::header::CONTENT_TYPE;
|
||||
use http::StatusCode;
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::Encoder;
|
||||
use metrics::TextEncoder;
|
||||
|
||||
use crate::{http::JsonResponse, installed_extensions};
|
||||
|
||||
/// Expose Prometheus metrics.
|
||||
pub(in crate::http) async fn get_metrics() -> Response {
|
||||
// When we call TextEncoder::encode() below, it will immediately return an
|
||||
// error if a metric family has no metrics, so we need to preemptively
|
||||
// filter out metric families with no metrics.
|
||||
let metrics = installed_extensions::collect()
|
||||
.into_iter()
|
||||
.filter(|m| !m.get_metric().is_empty())
|
||||
.collect::<Vec<MetricFamily>>();
|
||||
|
||||
let encoder = TextEncoder::new();
|
||||
let mut buffer = vec![];
|
||||
|
||||
if let Err(e) = encoder.encode(&metrics, &mut buffer) {
|
||||
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
|
||||
}
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
.unwrap()
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, response::Response};
|
||||
use http::StatusCode;
|
||||
|
||||
use crate::{compute::ComputeNode, http::JsonResponse};
|
||||
|
||||
/// Get startup metrics.
|
||||
pub(in crate::http) async fn get_metrics(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||
let metrics = compute.state.lock().unwrap().metrics.clone();
|
||||
JsonResponse::success(StatusCode::OK, metrics)
|
||||
}
|
||||
@@ -1,38 +0,0 @@
|
||||
use compute_api::responses::ComputeStatusResponse;
|
||||
|
||||
use crate::compute::ComputeState;
|
||||
|
||||
pub(in crate::http) mod check_writability;
|
||||
pub(in crate::http) mod configure;
|
||||
pub(in crate::http) mod database_schema;
|
||||
pub(in crate::http) mod dbs_and_roles;
|
||||
pub(in crate::http) mod extension_server;
|
||||
pub(in crate::http) mod extensions;
|
||||
pub(in crate::http) mod failpoints;
|
||||
pub(in crate::http) mod grants;
|
||||
pub(in crate::http) mod info;
|
||||
pub(in crate::http) mod insights;
|
||||
pub(in crate::http) mod installed_extensions;
|
||||
pub(in crate::http) mod metrics;
|
||||
pub(in crate::http) mod metrics_json;
|
||||
pub(in crate::http) mod status;
|
||||
pub(in crate::http) mod terminate;
|
||||
|
||||
impl From<&ComputeState> for ComputeStatusResponse {
|
||||
fn from(state: &ComputeState) -> Self {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
tenant: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|pspec| pspec.tenant_id.to_string()),
|
||||
timeline: state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|pspec| pspec.timeline_id.to_string()),
|
||||
status: state.status,
|
||||
last_active: state.last_active,
|
||||
error: state.error.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use axum::{extract::State, http::StatusCode, response::Response};
|
||||
use compute_api::responses::ComputeStatusResponse;
|
||||
|
||||
use crate::{compute::ComputeNode, http::JsonResponse};
|
||||
|
||||
/// Retrieve the state of the comute.
|
||||
pub(in crate::http) async fn get_status(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let body = ComputeStatusResponse::from(state.deref());
|
||||
|
||||
JsonResponse::success(StatusCode::OK, body)
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{
|
||||
extract::State,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use http::StatusCode;
|
||||
use tokio::task;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
compute::{forward_termination_signal, ComputeNode},
|
||||
http::JsonResponse,
|
||||
};
|
||||
|
||||
/// Terminate the compute.
|
||||
pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status == ComputeStatus::Terminated {
|
||||
return StatusCode::CREATED.into_response();
|
||||
}
|
||||
|
||||
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
|
||||
return JsonResponse::invalid_status(state.status);
|
||||
}
|
||||
|
||||
state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
|
||||
drop(state);
|
||||
}
|
||||
|
||||
forward_termination_signal();
|
||||
info!("sent signal and notified waiters");
|
||||
|
||||
// Spawn a blocking thread to wait for compute to become Terminated.
|
||||
// This is needed to do not block the main pool of workers and
|
||||
// be able to serve other requests while some particular request
|
||||
// is waiting for compute to finish configuration.
|
||||
let c = compute.clone();
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Terminated {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become {}, current status: {:?}",
|
||||
ComputeStatus::Terminated,
|
||||
state.status
|
||||
);
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
info!("terminated Postgres");
|
||||
|
||||
StatusCode::OK.into_response()
|
||||
}
|
||||
@@ -1,165 +0,0 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv6Addr, SocketAddr},
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use axum::{
|
||||
response::{IntoResponse, Response},
|
||||
routing::{get, post},
|
||||
Router,
|
||||
};
|
||||
use http::StatusCode;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::{
|
||||
request_id::{MakeRequestId, PropagateRequestIdLayer, RequestId, SetRequestIdLayer},
|
||||
trace::TraceLayer,
|
||||
};
|
||||
use tracing::{debug, error, info, Span};
|
||||
|
||||
use super::routes::{
|
||||
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||
grants, info as info_route, insights, installed_extensions, metrics, metrics_json, status,
|
||||
terminate,
|
||||
};
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
async fn handle_404() -> Response {
|
||||
StatusCode::NOT_FOUND.into_response()
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct ComputeMakeRequestId(Arc<AtomicU64>);
|
||||
|
||||
impl MakeRequestId for ComputeMakeRequestId {
|
||||
fn make_request_id<B>(
|
||||
&mut self,
|
||||
_request: &http::Request<B>,
|
||||
) -> Option<tower_http::request_id::RequestId> {
|
||||
let request_id = self
|
||||
.0
|
||||
.fetch_add(1, Ordering::SeqCst)
|
||||
.to_string()
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
Some(RequestId::new(request_id))
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the HTTP server and wait on it forever.
|
||||
#[tokio::main]
|
||||
async fn serve(port: u16, compute: Arc<ComputeNode>) {
|
||||
const X_REQUEST_ID: &str = "x-request-id";
|
||||
|
||||
let mut app = Router::new()
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
|
||||
.route(
|
||||
"/extension_server/*filename",
|
||||
post(extension_server::download_extension),
|
||||
)
|
||||
.route("/extensions", post(extensions::install_extension))
|
||||
.route("/grants", post(grants::add_grant))
|
||||
.route("/info", get(info_route::get_info))
|
||||
.route("/insights", get(insights::get_insights))
|
||||
.route(
|
||||
"/installed_extensions",
|
||||
get(installed_extensions::get_installed_extensions),
|
||||
)
|
||||
.route("/metrics", get(metrics::get_metrics))
|
||||
.route("/metrics.json", get(metrics_json::get_metrics))
|
||||
.route("/status", get(status::get_status))
|
||||
.route("/terminate", post(terminate::terminate))
|
||||
.fallback(handle_404)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
.layer(SetRequestIdLayer::x_request_id(
|
||||
ComputeMakeRequestId::default(),
|
||||
))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
let request_id = request
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
match request.uri().path() {
|
||||
"/metrics" => {
|
||||
debug!(%request_id, "{} {}", request.method(), request.uri())
|
||||
}
|
||||
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
|
||||
};
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
let request_id = response
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
info!(
|
||||
%request_id,
|
||||
code = response.status().as_u16(),
|
||||
latency = latency.as_millis()
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
.with_state(compute);
|
||||
|
||||
// Add in any testing support
|
||||
if cfg!(feature = "testing") {
|
||||
use super::routes::failpoints;
|
||||
|
||||
app = app.route("/failpoints", post(failpoints::configure_failpoints))
|
||||
}
|
||||
|
||||
// This usually binds to both IPv4 and IPv6 on Linux, see
|
||||
// https://github.com/rust-lang/rust/pull/34440 for more information
|
||||
let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
|
||||
let listener = match TcpListener::bind(&addr).await {
|
||||
Ok(listener) => listener,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"failed to bind the compute_ctl HTTP server to port {}: {}",
|
||||
port, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(local_addr) = listener.local_addr() {
|
||||
info!("compute_ctl HTTP server listening on {}", local_addr);
|
||||
} else {
|
||||
info!("compute_ctl HTTP server listening on port {}", port);
|
||||
}
|
||||
|
||||
if let Err(e) = axum::serve(listener, app).await {
|
||||
error!("compute_ctl HTTP server error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/// Launch a separate HTTP server thread and return its `JoinHandle`.
|
||||
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
let state = Arc::clone(state);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
.name("http-server".into())
|
||||
.spawn(move || serve(port, state))?)
|
||||
}
|
||||
@@ -3,6 +3,8 @@
|
||||
#![deny(unsafe_code)]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
extern crate hyper0 as hyper;
|
||||
|
||||
pub mod checker;
|
||||
pub mod config;
|
||||
pub mod configurator;
|
||||
|
||||
@@ -75,7 +75,7 @@ pub struct MutableApplyContext {
|
||||
pub dbs: HashMap<String, Database>,
|
||||
}
|
||||
|
||||
/// Apply the operations that belong to the given spec apply phase.
|
||||
/// Appply the operations that belong to the given spec apply phase.
|
||||
///
|
||||
/// Commands within a single phase are executed in order of Iterator yield.
|
||||
/// Commands of ApplySpecPhase::RunInEachDatabase will execute in the database
|
||||
@@ -498,19 +498,7 @@ async fn get_operations<'a>(
|
||||
),
|
||||
comment: None,
|
||||
},
|
||||
// Revoke some potentially blocking privileges (Neon-specific currently)
|
||||
Operation {
|
||||
query: format!(
|
||||
include_str!("sql/pre_drop_role_revoke_privileges.sql"),
|
||||
role_name = quoted,
|
||||
),
|
||||
comment: None,
|
||||
},
|
||||
// This now will only drop privileges of the role
|
||||
// TODO: this is obviously not 100% true because of the above case,
|
||||
// there could be still some privileges that are not revoked. Maybe this
|
||||
// only drops privileges that were granted *by this* role, not *to this* role,
|
||||
// but this has to be checked.
|
||||
Operation {
|
||||
query: format!("DROP OWNED BY {}", quoted),
|
||||
comment: None,
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
SET SESSION ROLE neon_superuser;
|
||||
|
||||
DO $$
|
||||
DECLARE
|
||||
schema TEXT;
|
||||
revoke_query TEXT;
|
||||
BEGIN
|
||||
FOR schema IN
|
||||
SELECT schema_name
|
||||
FROM information_schema.schemata
|
||||
-- So far, we only had issues with 'public' schema. Probably, because we do some additional grants,
|
||||
-- e.g., make DB owner the owner of 'public' schema automatically (when created via API).
|
||||
-- See https://github.com/neondatabase/cloud/issues/13582 for the context.
|
||||
-- Still, keep the loop because i) it efficiently handles the case when there is no 'public' schema,
|
||||
-- ii) it's easy to add more schemas to the list if needed.
|
||||
WHERE schema_name IN ('public')
|
||||
LOOP
|
||||
revoke_query := format(
|
||||
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM {role_name} GRANTED BY neon_superuser;',
|
||||
schema
|
||||
);
|
||||
|
||||
EXECUTE revoke_query;
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
|
||||
RESET ROLE;
|
||||
@@ -62,7 +62,7 @@ use crate::local_env::LocalEnv;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
use crate::storage_controller::StorageController;
|
||||
|
||||
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
|
||||
use compute_api::responses::{ComputeState, ComputeStatus};
|
||||
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
|
||||
|
||||
// contents of a endpoint.json file
|
||||
@@ -739,7 +739,7 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
// Call the /status HTTP API
|
||||
pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
|
||||
pub async fn get_status(&self) -> Result<ComputeState> {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let response = client
|
||||
|
||||
@@ -1035,15 +1035,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
resp.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.set_header([
|
||||
"Id",
|
||||
"Version",
|
||||
"Host",
|
||||
"Port",
|
||||
"Http Port",
|
||||
"AZ Id",
|
||||
"Scheduling",
|
||||
]);
|
||||
table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
|
||||
for sk in resp {
|
||||
table.add_row([
|
||||
format!("{}", sk.id),
|
||||
@@ -1051,8 +1043,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
sk.host,
|
||||
format!("{}", sk.port),
|
||||
format!("{}", sk.http_port),
|
||||
sk.availability_zone_id.clone(),
|
||||
String::from(sk.scheduling_policy),
|
||||
sk.availability_zone_id.to_string(),
|
||||
]);
|
||||
}
|
||||
println!("{table}");
|
||||
|
||||
1
debug-oom/.gitignore
vendored
Normal file
1
debug-oom/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
backup.tar.gz
|
||||
21
debug-oom/README.md
Normal file
21
debug-oom/README.md
Normal file
@@ -0,0 +1,21 @@
|
||||
To build a compute image:
|
||||
```
|
||||
docker build --build-arg GIT_VERSION=custombuild --build-arg PG_VERSION=v16 -t neon-local-v16 -f ../compute/compute-node.Dockerfile .. && \
|
||||
../../autoscaling/bin/vm-builder \
|
||||
-spec=../compute/vm-image-spec-bullseye.yaml \
|
||||
-src=neon-local-v16:latest \
|
||||
-dst=vm-neon-local-v16:latest \
|
||||
-target-arch=linux/amd64 \
|
||||
-size 2G && \
|
||||
../../autoscaling/bin/kind load docker-image vm-neon-local-v16:latest --name neonvm-arthur
|
||||
```
|
||||
|
||||
To start a compute node:
|
||||
```
|
||||
kubectl apply -f ./spec.yml
|
||||
```
|
||||
|
||||
How to destroy:
|
||||
```
|
||||
kubectl delete -f ./spec.yml
|
||||
```
|
||||
99
debug-oom/spec.yml
Normal file
99
debug-oom/spec.yml
Normal file
@@ -0,0 +1,99 @@
|
||||
apiVersion: vm.neon.tech/v1
|
||||
kind: VirtualMachine
|
||||
metadata:
|
||||
annotations:
|
||||
autoscaling.neon.tech/bounds: '{"min":{"cpu":"250m","mem":"1Gi"},"max":{"cpu":"2","mem":"8Gi"}}'
|
||||
autoscaling.neon.tech/config: '{"enableLFCMetrics":true}'
|
||||
creationTimestamp: "2025-01-04T18:37:29Z"
|
||||
finalizers:
|
||||
- vm.neon.tech/finalizer
|
||||
generation: 1
|
||||
labels:
|
||||
autoscaling.neon.tech/enabled: "true"
|
||||
neon/component: compute-node
|
||||
neon/compute-id: compute-purple-art-unreal
|
||||
neon/endpoint-id: ep-unreal
|
||||
name: compute-purple-art-unreal
|
||||
namespace: default
|
||||
spec:
|
||||
cpuScalingMode: QmpScaling
|
||||
disks:
|
||||
- emptyDisk:
|
||||
discard: true
|
||||
size: 36096Mi
|
||||
mountPath: /neonvm/cache
|
||||
name: cache
|
||||
readOnly: false
|
||||
- emptyDisk:
|
||||
discard: true
|
||||
enableQuotas: true
|
||||
size: 150Gi
|
||||
mountPath: /var/db/postgres/compute
|
||||
name: pgdata
|
||||
readOnly: false
|
||||
enableAcceleration: true
|
||||
enableNetworkMonitoring: false
|
||||
enableSSH: true
|
||||
guest:
|
||||
args:
|
||||
- -c
|
||||
- /usr/local/bin/compute_ctl -D /var/db/postgres/compute/pgdata -b /usr/local/bin/postgres
|
||||
-C postgresql://cloud_admin@127.0.0.1/postgres?options=-c%20default_transaction_read_only%3Dfalse
|
||||
--compute-id compute-purple-art-unreal --control-plane-uri http://dontexist.local:9096
|
||||
--resize-swap-on-bind --set-disk-quota-for-fs /var/db/postgres/compute 2>&1
|
||||
command:
|
||||
- /bin/sh
|
||||
cpus:
|
||||
max: 10
|
||||
min: 250m
|
||||
use: 500m
|
||||
env:
|
||||
- name: RUST_LOG
|
||||
value: info
|
||||
- name: OTEL_SDK_DISABLED
|
||||
value: "true"
|
||||
- name: AUTOSCALING
|
||||
value: "true"
|
||||
memorySlotSize: 1Gi
|
||||
memorySlots:
|
||||
max: 40
|
||||
min: 1
|
||||
use: 2
|
||||
ports:
|
||||
- name: postgres
|
||||
port: 5432
|
||||
protocol: TCP
|
||||
- name: control
|
||||
port: 3080
|
||||
protocol: TCP
|
||||
- name: pooler
|
||||
port: 6432
|
||||
protocol: TCP
|
||||
- name: host-metrics
|
||||
port: 9100
|
||||
protocol: TCP
|
||||
- name: metrics
|
||||
port: 9187
|
||||
protocol: TCP
|
||||
- name: sql-exporter
|
||||
port: 9399
|
||||
protocol: TCP
|
||||
- name: sql-exporter-2
|
||||
port: 9499
|
||||
protocol: TCP
|
||||
- name: vm-monitor
|
||||
port: 10301
|
||||
protocol: TCP
|
||||
- name: local-proxy
|
||||
port: 10432
|
||||
protocol: TCP
|
||||
rootDisk:
|
||||
image: vm-neon-local-v16
|
||||
imagePullPolicy: IfNotPresent
|
||||
size: 20Gi
|
||||
settings:
|
||||
swap: 40Gi
|
||||
sysctl:
|
||||
- vm.overcommit_memory=2
|
||||
restartPolicy: Always
|
||||
schedulerName: autoscale-scheduler
|
||||
@@ -7,11 +7,15 @@ Currently we build two main images:
|
||||
- [neondatabase/neon](https://hub.docker.com/repository/docker/neondatabase/neon) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
|
||||
- [neondatabase/compute-node-v16](https://hub.docker.com/repository/docker/neondatabase/compute-node-v16) — compute node image with pre-built Postgres binaries from [neondatabase/postgres](https://github.com/neondatabase/postgres). Similar images exist for v15 and v14. Built from [/compute-node/Dockerfile](/compute/compute-node.Dockerfile).
|
||||
|
||||
And additional intermediate image:
|
||||
|
||||
- [neondatabase/compute-tools](https://hub.docker.com/repository/docker/neondatabase/compute-tools) — compute node configuration management tools.
|
||||
|
||||
## Build pipeline
|
||||
|
||||
We build all images after a successful `release` tests run and push automatically to Docker Hub with two parallel CI jobs
|
||||
|
||||
1. `neondatabase/compute-node-v17` (and -16, -v15, -v14)
|
||||
1. `neondatabase/compute-tools` and `neondatabase/compute-node-v16` (and -v15 and -v14)
|
||||
|
||||
2. `neondatabase/neon`
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Storage broker
|
||||
|
||||
Storage broker targets two issues
|
||||
Storage broker targets two issues:
|
||||
- Allowing safekeepers and pageservers learn which nodes also hold their
|
||||
timelines, and timeline statuses there.
|
||||
- Avoiding O(n^2) connections between storage nodes while doing so.
|
||||
@@ -19,7 +19,7 @@ Currently, the only message is `SafekeeperTimelineInfo`. Each safekeeper, for
|
||||
each active timeline, once in a while pushes timeline status to the broker.
|
||||
Other nodes subscribe and receive this info, using it per above.
|
||||
|
||||
Broker serves /metrics on the same port as grpc service.
|
||||
Broker serves /metrics on the same port as grpc service.
|
||||
|
||||
grpcurl can be used to check which values are currently being pushed:
|
||||
```
|
||||
|
||||
@@ -15,17 +15,6 @@ pub struct GenericAPIError {
|
||||
pub error: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct InfoResponse {
|
||||
pub num_cpus: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ExtensionInstallResponse {
|
||||
pub extension: PgIdent,
|
||||
pub version: ExtVersion,
|
||||
}
|
||||
|
||||
/// Response of the /status API
|
||||
#[derive(Serialize, Debug, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@@ -39,6 +28,16 @@ pub struct ComputeStatusResponse {
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct ComputeState {
|
||||
pub status: ComputeStatus,
|
||||
/// Timestamp of the last Postgres activity
|
||||
#[serde(serialize_with = "rfc3339_serialize")]
|
||||
pub last_active: Option<DateTime<Utc>>,
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ComputeStatus {
|
||||
@@ -79,7 +78,7 @@ impl Display for ComputeStatus {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>
|
||||
fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
|
||||
@@ -320,38 +320,6 @@ impl From<NodeSchedulingPolicy> for String {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
||||
pub enum SkSchedulingPolicy {
|
||||
Active,
|
||||
Disabled,
|
||||
Decomissioned,
|
||||
}
|
||||
|
||||
impl FromStr for SkSchedulingPolicy {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(match s {
|
||||
"active" => Self::Active,
|
||||
"disabled" => Self::Disabled,
|
||||
"decomissioned" => Self::Decomissioned,
|
||||
_ => return Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SkSchedulingPolicy> for String {
|
||||
fn from(value: SkSchedulingPolicy) -> String {
|
||||
use SkSchedulingPolicy::*;
|
||||
match value {
|
||||
Active => "active",
|
||||
Disabled => "disabled",
|
||||
Decomissioned => "decomissioned",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
|
||||
/// to create secondary locations.
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
@@ -419,7 +387,6 @@ pub struct SafekeeperDescribeResponse {
|
||||
pub port: i32,
|
||||
pub http_port: i32,
|
||||
pub availability_zone_id: String,
|
||||
pub scheduling_policy: SkSchedulingPolicy,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -272,8 +272,6 @@ pub struct CompactInfoResponse {
|
||||
pub compact_key_range: Option<CompactKeyRange>,
|
||||
pub compact_lsn_range: Option<CompactLsnRange>,
|
||||
pub sub_compaction: bool,
|
||||
pub running: bool,
|
||||
pub job_id: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
|
||||
@@ -115,15 +115,13 @@ fn default_max_keys_per_list_response() -> Option<i32> {
|
||||
}
|
||||
|
||||
fn default_azure_conn_pool_size() -> usize {
|
||||
// By default, the Azure SDK does no connection pooling, due to historic reports of hard-to-reproduce issues
|
||||
// Conservative default: no connection pooling. At time of writing this is the Azure
|
||||
// SDK's default as well, due to historic reports of hard-to-reproduce issues
|
||||
// (https://github.com/hyperium/hyper/issues/2312)
|
||||
//
|
||||
// However, using connection pooling is important to avoid exhausting client ports when
|
||||
// doing huge numbers of requests (https://github.com/neondatabase/cloud/issues/20971)
|
||||
//
|
||||
// We therefore enable a modest pool size by default: this may be configured to zero if
|
||||
// issues like the alleged upstream hyper issue appear.
|
||||
8
|
||||
0
|
||||
}
|
||||
|
||||
impl Debug for S3Config {
|
||||
|
||||
@@ -38,6 +38,7 @@ pub mod http;
|
||||
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_sdk::Resource;
|
||||
use tracing::Subscriber;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
use tracing_subscriber::Layer;
|
||||
@@ -120,10 +121,7 @@ where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
// Sets up exporter from the OTEL_EXPORTER_* environment variables.
|
||||
let exporter = opentelemetry_otlp::SpanExporter::builder()
|
||||
.with_http()
|
||||
.build()
|
||||
.expect("could not initialize opentelemetry exporter");
|
||||
let exporter = opentelemetry_otlp::new_exporter().http();
|
||||
|
||||
// TODO: opentelemetry::global::set_error_handler() with custom handler that
|
||||
// bypasses default tracing layers, but logs regular looking log
|
||||
@@ -134,13 +132,17 @@ where
|
||||
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
|
||||
);
|
||||
|
||||
let tracer = opentelemetry_sdk::trace::TracerProvider::builder()
|
||||
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
|
||||
.with_resource(opentelemetry_sdk::Resource::new(vec![KeyValue::new(
|
||||
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
|
||||
service_name,
|
||||
)]))
|
||||
.build()
|
||||
let tracer = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_exporter(exporter)
|
||||
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(
|
||||
Resource::new(vec![KeyValue::new(
|
||||
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
|
||||
service_name,
|
||||
)]),
|
||||
))
|
||||
.install_batch(opentelemetry_sdk::runtime::Tokio)
|
||||
.expect("could not initialize opentelemetry exporter")
|
||||
.tracer("global");
|
||||
|
||||
tracing_opentelemetry::layer().with_tracer(tracer)
|
||||
|
||||
@@ -26,7 +26,6 @@ git-version.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
hyper0 = { workspace = true, features = ["full"] }
|
||||
inferno.workspace = true
|
||||
itertools.workspace = true
|
||||
fail.workspace = true
|
||||
futures = { workspace = true }
|
||||
|
||||
@@ -417,7 +417,6 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
enum Format {
|
||||
Jemalloc,
|
||||
Pprof,
|
||||
Svg,
|
||||
}
|
||||
|
||||
// Parameters.
|
||||
@@ -425,24 +424,9 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
None => Format::Pprof,
|
||||
Some("jemalloc") => Format::Jemalloc,
|
||||
Some("pprof") => Format::Pprof,
|
||||
Some("svg") => Format::Svg,
|
||||
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
|
||||
};
|
||||
|
||||
// Functions and mappings to strip when symbolizing pprof profiles. If true,
|
||||
// also remove child frames.
|
||||
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
|
||||
vec![
|
||||
(Regex::new("^__rust").unwrap(), false),
|
||||
(Regex::new("^_start$").unwrap(), false),
|
||||
(Regex::new("^irallocx_prof").unwrap(), true),
|
||||
(Regex::new("^prof_alloc_prep").unwrap(), true),
|
||||
(Regex::new("^std::rt::lang_start").unwrap(), false),
|
||||
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
|
||||
]
|
||||
});
|
||||
const STRIP_MAPPINGS: &[&str] = &["libc", "libgcc", "pthread", "vdso"];
|
||||
|
||||
// Obtain profiler handle.
|
||||
let mut prof_ctl = jemalloc_pprof::PROF_CTL
|
||||
.as_ref()
|
||||
@@ -480,9 +464,24 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
// Symbolize the profile.
|
||||
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
|
||||
// serialization roundtrip.
|
||||
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
|
||||
// Functions to strip from profiles. If true, also remove child frames.
|
||||
vec![
|
||||
(Regex::new("^__rust").unwrap(), false),
|
||||
(Regex::new("^_start$").unwrap(), false),
|
||||
(Regex::new("^irallocx_prof").unwrap(), true),
|
||||
(Regex::new("^prof_alloc_prep").unwrap(), true),
|
||||
(Regex::new("^std::rt::lang_start").unwrap(), false),
|
||||
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
|
||||
]
|
||||
});
|
||||
let profile = pprof::decode(&bytes)?;
|
||||
let profile = pprof::symbolize(profile)?;
|
||||
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
|
||||
let profile = pprof::strip_locations(
|
||||
profile,
|
||||
&["libc", "libgcc", "pthread", "vdso"],
|
||||
&STRIP_FUNCTIONS,
|
||||
);
|
||||
pprof::encode(&profile)
|
||||
})
|
||||
.await
|
||||
@@ -495,27 +494,6 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
|
||||
.body(Body::from(data))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
|
||||
Format::Svg => {
|
||||
let body = tokio::task::spawn_blocking(move || {
|
||||
let bytes = prof_ctl.dump_pprof()?;
|
||||
let profile = pprof::decode(&bytes)?;
|
||||
let profile = pprof::symbolize(profile)?;
|
||||
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
|
||||
let mut opts = inferno::flamegraph::Options::default();
|
||||
opts.title = "Heap inuse".to_string();
|
||||
opts.count_name = "bytes".to_string();
|
||||
pprof::flamegraph(profile, &mut opts)
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "image/svg+xml")
|
||||
.body(Body::from(body))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use anyhow::bail;
|
||||
use flate2::write::{GzDecoder, GzEncoder};
|
||||
use flate2::Compression;
|
||||
use itertools::Itertools as _;
|
||||
use once_cell::sync::Lazy;
|
||||
use pprof::protos::{Function, Line, Location, Message as _, Profile};
|
||||
use pprof::protos::{Function, Line, Message as _, Profile};
|
||||
use regex::Regex;
|
||||
|
||||
use std::borrow::Cow;
|
||||
@@ -189,59 +188,3 @@ pub fn strip_locations(
|
||||
|
||||
profile
|
||||
}
|
||||
|
||||
/// Generates an SVG flamegraph from a symbolized pprof profile.
|
||||
pub fn flamegraph(
|
||||
profile: Profile,
|
||||
opts: &mut inferno::flamegraph::Options,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
if profile.mapping.iter().any(|m| !m.has_functions) {
|
||||
bail!("profile not symbolized");
|
||||
}
|
||||
|
||||
// Index locations, functions, and strings.
|
||||
let locations: HashMap<u64, Location> =
|
||||
profile.location.into_iter().map(|l| (l.id, l)).collect();
|
||||
let functions: HashMap<u64, Function> =
|
||||
profile.function.into_iter().map(|f| (f.id, f)).collect();
|
||||
let strings = profile.string_table;
|
||||
|
||||
// Resolve stacks as function names, and sum sample values per stack. Also reverse the stack,
|
||||
// since inferno expects it bottom-up.
|
||||
let mut stacks: HashMap<Vec<&str>, i64> = HashMap::new();
|
||||
for sample in profile.sample {
|
||||
let mut stack = Vec::with_capacity(sample.location_id.len());
|
||||
for location in sample.location_id.into_iter().rev() {
|
||||
let Some(location) = locations.get(&location) else {
|
||||
bail!("missing location {location}");
|
||||
};
|
||||
for line in location.line.iter().rev() {
|
||||
let Some(function) = functions.get(&line.function_id) else {
|
||||
bail!("missing function {}", line.function_id);
|
||||
};
|
||||
let Some(name) = strings.get(function.name as usize) else {
|
||||
bail!("missing string {}", function.name);
|
||||
};
|
||||
stack.push(name.as_str());
|
||||
}
|
||||
}
|
||||
let Some(&value) = sample.value.first() else {
|
||||
bail!("missing value");
|
||||
};
|
||||
*stacks.entry(stack).or_default() += value;
|
||||
}
|
||||
|
||||
// Construct stack lines for inferno.
|
||||
let lines = stacks
|
||||
.into_iter()
|
||||
.map(|(stack, value)| (stack.into_iter().join(";"), value))
|
||||
.map(|(stack, value)| format!("{stack} {value}"))
|
||||
.sorted()
|
||||
.collect_vec();
|
||||
|
||||
// Construct the flamegraph.
|
||||
let mut bytes = Vec::new();
|
||||
let lines = lines.iter().map(|line| line.as_str());
|
||||
inferno::flamegraph::from_lines(opts, lines, &mut bytes)?;
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
@@ -96,11 +96,7 @@ impl<T: Send> Sender<T> {
|
||||
}
|
||||
}
|
||||
State::SenderWaitsForReceiverToConsume(_data) => {
|
||||
// SAFETY: send is single threaded due to `&mut self` requirement,
|
||||
// therefore register is not concurrent.
|
||||
unsafe {
|
||||
self.state.wake_sender.register(cx.waker());
|
||||
}
|
||||
// Really, we shouldn't be polled until receiver has consumed and wakes us.
|
||||
Poll::Pending
|
||||
}
|
||||
State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)),
|
||||
@@ -453,38 +449,4 @@ mod tests {
|
||||
let err = recv_task.await.unwrap().expect_err("should error");
|
||||
assert!(matches!(err, RecvError::SenderGone));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_receiver_drop_while_waiting_for_receiver_to_consume_unblocks_sender() {
|
||||
let (mut sender, receiver) = channel();
|
||||
|
||||
let state = receiver.state.clone();
|
||||
|
||||
sender.send((), |_, _| unreachable!()).await.unwrap();
|
||||
|
||||
assert!(matches!(&*state.value.lock().unwrap(), &State::HasData(_)));
|
||||
|
||||
let unmergeable = sender.send((), |_, _| Err(()));
|
||||
let mut unmergeable = std::pin::pin!(unmergeable);
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(FOREVER) => {},
|
||||
_ = &mut unmergeable => {
|
||||
panic!("unmergeable should not complete");
|
||||
},
|
||||
}
|
||||
|
||||
assert!(matches!(
|
||||
&*state.value.lock().unwrap(),
|
||||
&State::SenderWaitsForReceiverToConsume(_)
|
||||
));
|
||||
|
||||
drop(receiver);
|
||||
|
||||
assert!(matches!(
|
||||
&*state.value.lock().unwrap(),
|
||||
&State::ReceiverGone
|
||||
));
|
||||
|
||||
unmergeable.await.unwrap_err();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
TimelineInfo,
|
||||
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
|
||||
TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
@@ -2052,7 +2052,15 @@ async fn timeline_compact_info_handler(
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let resp = tenant.get_scheduled_compaction_tasks(timeline_id);
|
||||
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
|
||||
let mut resp = Vec::new();
|
||||
for item in res {
|
||||
resp.push(CompactInfoResponse {
|
||||
compact_key_range: item.compact_key_range,
|
||||
compact_lsn_range: item.compact_lsn_range,
|
||||
sub_compaction: item.sub_compaction,
|
||||
});
|
||||
}
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
|
||||
@@ -91,6 +91,15 @@ pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_layers_visited_per_read_global",
|
||||
"Number of layers visited to reconstruct one key",
|
||||
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_layers_visited_per_vectored_read_global",
|
||||
@@ -3885,6 +3894,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
|
||||
|
||||
// histograms
|
||||
[
|
||||
&READ_NUM_LAYERS_VISITED,
|
||||
&VEC_READ_NUM_LAYERS_VISITED,
|
||||
&WAIT_LSN_TIME,
|
||||
&WAL_REDO_TIME,
|
||||
|
||||
@@ -618,9 +618,6 @@ impl BatchedFeMessage {
|
||||
};
|
||||
let throttled = tokio::select! {
|
||||
throttled = shard.pagestream_throttle.throttle(tokens) => { throttled }
|
||||
_ = shard.cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ use enumset::EnumSet;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::TimelineArchivalState;
|
||||
use pageserver_api::models::TimelineState;
|
||||
@@ -38,17 +37,20 @@ use remote_timeline_client::manifest::{
|
||||
};
|
||||
use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::compaction::GcCompactionQueue;
|
||||
use timeline::compaction::GcCompactJob;
|
||||
use timeline::compaction::ScheduledCompactionTask;
|
||||
use timeline::import_pgdata;
|
||||
use timeline::offload::offload_timeline;
|
||||
use timeline::offload::OffloadError;
|
||||
use timeline::CompactFlags;
|
||||
use timeline::CompactOptions;
|
||||
use timeline::CompactionError;
|
||||
use timeline::ShutdownMode;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
@@ -344,8 +346,10 @@ pub struct Tenant {
|
||||
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// Scheduled gc-compaction tasks.
|
||||
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
|
||||
/// Scheduled compaction tasks. Currently, this can only be populated by triggering
|
||||
/// a manual gc-compaction from the manual compaction API.
|
||||
scheduled_compaction_tasks:
|
||||
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,
|
||||
|
||||
/// If the tenant is in Activating state, notify this to encourage it
|
||||
/// to proceed to Active as soon as possible, rather than waiting for lazy
|
||||
@@ -2035,7 +2039,7 @@ impl Tenant {
|
||||
) -> Result<Arc<Timeline>, TimelineArchivalError> {
|
||||
info!("unoffloading timeline");
|
||||
|
||||
// We activate the timeline below manually, so this must be called on an active tenant.
|
||||
// We activate the timeline below manually, so this must be called on an active timeline.
|
||||
// We expect callers of this function to ensure this.
|
||||
match self.current_state() {
|
||||
TenantState::Activating { .. }
|
||||
@@ -2992,35 +2996,113 @@ impl Tenant {
|
||||
if has_pending_l0_compaction_task {
|
||||
Some(true)
|
||||
} else {
|
||||
let queue = {
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard.get(timeline_id).cloned()
|
||||
let mut has_pending_scheduled_compaction_task;
|
||||
let next_scheduled_compaction_task = {
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
|
||||
if !tline_pending_tasks.is_empty() {
|
||||
info!(
|
||||
"{} tasks left in the compaction schedule queue",
|
||||
tline_pending_tasks.len()
|
||||
);
|
||||
}
|
||||
let next_task = tline_pending_tasks.pop_front();
|
||||
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
|
||||
next_task
|
||||
} else {
|
||||
has_pending_scheduled_compaction_task = false;
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(queue) = queue {
|
||||
let has_pending_tasks = queue
|
||||
.iteration(cancel, ctx, &self.gc_block, timeline)
|
||||
.await?;
|
||||
Some(has_pending_tasks)
|
||||
} else {
|
||||
Some(false)
|
||||
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
|
||||
{
|
||||
if !next_scheduled_compaction_task
|
||||
.options
|
||||
.flags
|
||||
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
||||
{
|
||||
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
|
||||
} else if next_scheduled_compaction_task.options.sub_compaction {
|
||||
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||
let jobs: Vec<GcCompactJob> = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(
|
||||
next_scheduled_compaction_task.options.clone(),
|
||||
),
|
||||
next_scheduled_compaction_task
|
||||
.options
|
||||
.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
} else {
|
||||
has_pending_scheduled_compaction_task = true;
|
||||
let jobs_len = jobs.len();
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
|
||||
for (idx, job) in jobs.into_iter().enumerate() {
|
||||
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
|
||||
// until we do further refactors to allow directly call `compact_with_gc`.
|
||||
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
|
||||
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
|
||||
if job.dry_run {
|
||||
flags |= CompactFlags::DryRun;
|
||||
}
|
||||
let options = CompactOptions {
|
||||
flags,
|
||||
sub_compaction: false,
|
||||
compact_key_range: Some(job.compact_key_range.into()),
|
||||
compact_lsn_range: Some(job.compact_lsn_range.into()),
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
};
|
||||
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
|
||||
ScheduledCompactionTask {
|
||||
options,
|
||||
// The last job in the queue sends the signal and releases the gc guard
|
||||
result_tx: next_scheduled_compaction_task
|
||||
.result_tx
|
||||
.take(),
|
||||
gc_block: next_scheduled_compaction_task
|
||||
.gc_block
|
||||
.take(),
|
||||
}
|
||||
} else {
|
||||
ScheduledCompactionTask {
|
||||
options,
|
||||
result_tx: None,
|
||||
gc_block: None,
|
||||
}
|
||||
});
|
||||
}
|
||||
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
|
||||
}
|
||||
} else {
|
||||
let _ = timeline
|
||||
.compact_with_options(
|
||||
cancel,
|
||||
next_scheduled_compaction_task.options,
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
|
||||
// TODO: we can send compaction statistics in the future
|
||||
tx.send(()).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(has_pending_scheduled_compaction_task)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
has_pending_task |= pending_task_left.unwrap_or(false);
|
||||
if pending_task_left == Some(false) && *can_offload {
|
||||
pausable_failpoint!("before-timeline-auto-offload");
|
||||
match offload_timeline(self, timeline)
|
||||
offload_timeline(self, timeline)
|
||||
.instrument(info_span!("offload_timeline", %timeline_id))
|
||||
.await
|
||||
{
|
||||
Err(OffloadError::NotArchived) => {
|
||||
// Ignore this, we likely raced with unarchival
|
||||
Ok(())
|
||||
}
|
||||
other => other,
|
||||
}?;
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3033,32 +3115,34 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Cancel scheduled compaction tasks
|
||||
pub(crate) fn cancel_scheduled_compaction(&self, timeline_id: TimelineId) {
|
||||
pub(crate) fn cancel_scheduled_compaction(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Vec<ScheduledCompactionTask> {
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
if let Some(q) = guard.get_mut(&timeline_id) {
|
||||
q.cancel_scheduled();
|
||||
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
|
||||
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
|
||||
current_tline_pending_tasks.into_iter().collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduled_compaction_tasks(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Vec<CompactInfoResponse> {
|
||||
let res = {
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard.get(&timeline_id).map(|q| q.remaining_jobs())
|
||||
};
|
||||
let Some((running, remaining)) = res else {
|
||||
return Vec::new();
|
||||
};
|
||||
let mut result = Vec::new();
|
||||
if let Some((id, running)) = running {
|
||||
result.extend(running.into_compact_info_resp(id, true));
|
||||
}
|
||||
for (id, job) in remaining {
|
||||
result.extend(job.into_compact_info_resp(id, false));
|
||||
}
|
||||
result
|
||||
) -> Vec<CompactOptions> {
|
||||
use itertools::Itertools;
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard
|
||||
.get(&timeline_id)
|
||||
.map(|tline_pending_tasks| {
|
||||
tline_pending_tasks
|
||||
.iter()
|
||||
.map(|x| x.options.clone())
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Schedule a compaction task for a timeline.
|
||||
@@ -3067,12 +3151,20 @@ impl Tenant {
|
||||
timeline_id: TimelineId,
|
||||
options: CompactOptions,
|
||||
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
|
||||
let gc_guard = match self.gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
bail!("cannot run gc-compaction because gc is blocked: {}", e);
|
||||
}
|
||||
};
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
let q = guard
|
||||
.entry(timeline_id)
|
||||
.or_insert_with(|| Arc::new(GcCompactionQueue::new()));
|
||||
q.schedule_manual_compaction(options, Some(tx));
|
||||
let tline_pending_tasks = guard.entry(timeline_id).or_default();
|
||||
tline_pending_tasks.push_back(ScheduledCompactionTask {
|
||||
options,
|
||||
result_tx: Some(tx),
|
||||
gc_block: Some(gc_guard),
|
||||
});
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
|
||||
@@ -304,15 +304,6 @@ pub enum WaitCompletionError {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
|
||||
pub struct UploadQueueNotReadyError;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ShutdownIfArchivedError {
|
||||
#[error(transparent)]
|
||||
NotInitialized(NotInitialized),
|
||||
#[error("timeline is not archived")]
|
||||
NotArchived,
|
||||
}
|
||||
|
||||
/// Behavioral modes that enable seamless live migration.
|
||||
///
|
||||
/// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
|
||||
@@ -825,55 +816,6 @@ impl RemoteTimelineClient {
|
||||
Ok(need_wait)
|
||||
}
|
||||
|
||||
/// Shuts the timeline client down, but only if the timeline is archived.
|
||||
///
|
||||
/// This function and [`Self::schedule_index_upload_for_timeline_archival_state`] use the
|
||||
/// same lock to prevent races between unarchival and offloading: unarchival requires the
|
||||
/// upload queue to be initialized, and leaves behind an upload queue where either dirty
|
||||
/// or clean has archived_at of `None`. offloading leaves behind an uninitialized upload
|
||||
/// queue.
|
||||
pub(crate) async fn shutdown_if_archived(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<(), ShutdownIfArchivedError> {
|
||||
{
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard
|
||||
.initialized_mut()
|
||||
.map_err(ShutdownIfArchivedError::NotInitialized)?;
|
||||
|
||||
match (
|
||||
upload_queue.dirty.archived_at.is_none(),
|
||||
upload_queue.clean.0.archived_at.is_none(),
|
||||
) {
|
||||
// The expected case: the timeline is archived and we don't want to unarchive
|
||||
(false, false) => {}
|
||||
(true, false) => {
|
||||
tracing::info!("can't shut down timeline: timeline slated for unarchival");
|
||||
return Err(ShutdownIfArchivedError::NotArchived);
|
||||
}
|
||||
(dirty_archived, true) => {
|
||||
tracing::info!(%dirty_archived, "can't shut down timeline: timeline not archived in remote storage");
|
||||
return Err(ShutdownIfArchivedError::NotArchived);
|
||||
}
|
||||
}
|
||||
|
||||
// Set the shutting_down flag while the guard from the archival check is held.
|
||||
// This prevents a race with unarchival, as initialized_mut will not return
|
||||
// an upload queue from this point.
|
||||
// Also launch the queued tasks like shutdown() does.
|
||||
if !upload_queue.shutting_down {
|
||||
upload_queue.shutting_down = true;
|
||||
upload_queue.queued_operations.push_back(UploadOp::Shutdown);
|
||||
// this operation is not counted similar to Barrier
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
}
|
||||
}
|
||||
|
||||
self.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an index-file upload operation in the background, setting `import_pgdata` field.
|
||||
pub(crate) fn schedule_index_upload_for_import_pgdata_state_update(
|
||||
self: &Arc<Self>,
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//!
|
||||
//! The old legacy algorithm is implemented directly in `timeline.rs`.
|
||||
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -16,12 +16,10 @@ use super::{
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::KEY_SIZE;
|
||||
use pageserver_api::keyspace::ShardedRange;
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -32,7 +30,6 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}
|
||||
use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::batch_split_writer::{
|
||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||
@@ -66,284 +63,16 @@ use super::CompactionError;
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
const COMPACTION_DELTA_THRESHOLD: usize = 5;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub struct GcCompactionJobId(pub usize);
|
||||
|
||||
impl std::fmt::Display for GcCompactionJobId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum GcCompactionQueueItem {
|
||||
Manual(CompactOptions),
|
||||
SubCompactionJob(CompactOptions),
|
||||
#[allow(dead_code)]
|
||||
UpdateL2Lsn(Lsn),
|
||||
Notify(GcCompactionJobId),
|
||||
}
|
||||
|
||||
impl GcCompactionQueueItem {
|
||||
pub fn into_compact_info_resp(
|
||||
self,
|
||||
id: GcCompactionJobId,
|
||||
running: bool,
|
||||
) -> Option<CompactInfoResponse> {
|
||||
match self {
|
||||
GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
|
||||
compact_key_range: options.compact_key_range,
|
||||
compact_lsn_range: options.compact_lsn_range,
|
||||
sub_compaction: options.sub_compaction,
|
||||
running,
|
||||
job_id: id.0,
|
||||
}),
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
|
||||
compact_key_range: options.compact_key_range,
|
||||
compact_lsn_range: options.compact_lsn_range,
|
||||
sub_compaction: options.sub_compaction,
|
||||
running,
|
||||
job_id: id.0,
|
||||
}),
|
||||
GcCompactionQueueItem::UpdateL2Lsn(_) => None,
|
||||
GcCompactionQueueItem::Notify(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct GcCompactionQueueInner {
|
||||
running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
|
||||
gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
|
||||
last_id: GcCompactionJobId,
|
||||
}
|
||||
|
||||
impl GcCompactionQueueInner {
|
||||
fn next_id(&mut self) -> GcCompactionJobId {
|
||||
let id = self.last_id;
|
||||
self.last_id = GcCompactionJobId(id.0 + 1);
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
/// A structure to store gc_compaction jobs.
|
||||
pub struct GcCompactionQueue {
|
||||
/// All items in the queue, and the currently-running job.
|
||||
inner: std::sync::Mutex<GcCompactionQueueInner>,
|
||||
/// Ensure only one thread is consuming the queue.
|
||||
consumer_lock: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl GcCompactionQueue {
|
||||
pub fn new() -> Self {
|
||||
GcCompactionQueue {
|
||||
inner: std::sync::Mutex::new(GcCompactionQueueInner {
|
||||
running: None,
|
||||
queued: VecDeque::new(),
|
||||
notify: HashMap::new(),
|
||||
gc_guards: HashMap::new(),
|
||||
last_id: GcCompactionJobId(0),
|
||||
}),
|
||||
consumer_lock: tokio::sync::Mutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancel_scheduled(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.queued.clear();
|
||||
guard.notify.clear();
|
||||
guard.gc_guards.clear();
|
||||
}
|
||||
|
||||
/// Schedule a manual compaction job.
|
||||
pub fn schedule_manual_compaction(
|
||||
&self,
|
||||
options: CompactOptions,
|
||||
notify: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
) -> GcCompactionJobId {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
let id = guard.next_id();
|
||||
guard
|
||||
.queued
|
||||
.push_back((id, GcCompactionQueueItem::Manual(options)));
|
||||
if let Some(notify) = notify {
|
||||
guard.notify.insert(id, notify);
|
||||
}
|
||||
info!("scheduled compaction job id={}", id);
|
||||
id
|
||||
}
|
||||
|
||||
/// Trigger an auto compaction.
|
||||
#[allow(dead_code)]
|
||||
pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
|
||||
|
||||
/// Notify the caller the job has finished and unblock GC.
|
||||
fn notify_and_unblock(&self, id: GcCompactionJobId) {
|
||||
info!("compaction job id={} finished", id);
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some(blocking) = guard.gc_guards.remove(&id) {
|
||||
drop(blocking)
|
||||
}
|
||||
if let Some(tx) = guard.notify.remove(&id) {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_sub_compaction(
|
||||
&self,
|
||||
id: GcCompactionJobId,
|
||||
options: CompactOptions,
|
||||
timeline: &Arc<Timeline>,
|
||||
gc_block: &GcBlock,
|
||||
) -> Result<(), CompactionError> {
|
||||
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||
let jobs: Vec<GcCompactJob> = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(options.clone()),
|
||||
options.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
self.notify_and_unblock(id);
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let jobs_len = jobs.len();
|
||||
let mut pending_tasks = Vec::new();
|
||||
for job in jobs {
|
||||
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
|
||||
// until we do further refactors to allow directly call `compact_with_gc`.
|
||||
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
|
||||
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
|
||||
if job.dry_run {
|
||||
flags |= CompactFlags::DryRun;
|
||||
}
|
||||
let options = CompactOptions {
|
||||
flags,
|
||||
sub_compaction: false,
|
||||
compact_key_range: Some(job.compact_key_range.into()),
|
||||
compact_lsn_range: Some(job.compact_lsn_range.into()),
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
};
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
|
||||
}
|
||||
pending_tasks.push(GcCompactionQueueItem::Notify(id));
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.gc_guards.insert(id, gc_guard);
|
||||
let mut tasks = Vec::new();
|
||||
for task in pending_tasks {
|
||||
let id = guard.next_id();
|
||||
tasks.push((id, task));
|
||||
}
|
||||
tasks.reverse();
|
||||
for item in tasks {
|
||||
guard.queued.push_front(item);
|
||||
}
|
||||
}
|
||||
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Take a job from the queue and process it. Returns if there are still pending tasks.
|
||||
pub async fn iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
gc_block: &GcBlock,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> Result<bool, CompactionError> {
|
||||
let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
|
||||
let has_pending_tasks;
|
||||
let (id, item) = {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
let Some((id, item)) = guard.queued.pop_front() else {
|
||||
return Ok(false);
|
||||
};
|
||||
guard.running = Some((id, item.clone()));
|
||||
has_pending_tasks = !guard.queued.is_empty();
|
||||
(id, item)
|
||||
};
|
||||
|
||||
match item {
|
||||
GcCompactionQueueItem::Manual(options) => {
|
||||
if !options
|
||||
.flags
|
||||
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
||||
{
|
||||
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
|
||||
} else if options.sub_compaction {
|
||||
self.handle_sub_compaction(id, options, timeline, gc_block)
|
||||
.await?;
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.gc_guards.insert(id, gc_guard);
|
||||
}
|
||||
let _ = timeline
|
||||
.compact_with_options(cancel, options, ctx)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
|
||||
.await?;
|
||||
self.notify_and_unblock(id);
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
let _ = timeline
|
||||
.compact_with_options(cancel, options, ctx)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
|
||||
.await?;
|
||||
}
|
||||
GcCompactionQueueItem::Notify(id) => {
|
||||
self.notify_and_unblock(id);
|
||||
}
|
||||
GcCompactionQueueItem::UpdateL2Lsn(_) => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
Ok(has_pending_tasks)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn remaining_jobs(
|
||||
&self,
|
||||
) -> (
|
||||
Option<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
) {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
(guard.running.clone(), guard.queued.clone())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn remaining_jobs_num(&self) -> usize {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
|
||||
}
|
||||
/// A scheduled compaction task.
|
||||
pub(crate) struct ScheduledCompactionTask {
|
||||
/// It's unfortunate that we need to store a compact options struct here because the only outer
|
||||
/// API we can call here is `compact_with_options` which does a few setup calls before starting the
|
||||
/// actual compaction job... We should refactor this to store `GcCompactionJob` in the future.
|
||||
pub options: CompactOptions,
|
||||
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
|
||||
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
|
||||
pub gc_block: Option<gc_block::Guard>,
|
||||
}
|
||||
|
||||
/// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
|
||||
|
||||
@@ -194,9 +194,7 @@ impl DeleteTimelineFlow {
|
||||
super::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let allow_offloaded_children = false;
|
||||
let set_stopping = true;
|
||||
let (timeline, mut guard) =
|
||||
Self::prepare(tenant, timeline_id, allow_offloaded_children, set_stopping)?;
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id, allow_offloaded_children)?;
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
@@ -336,7 +334,6 @@ impl DeleteTimelineFlow {
|
||||
tenant: &Tenant,
|
||||
timeline_id: TimelineId,
|
||||
allow_offloaded_children: bool,
|
||||
set_stopping: bool,
|
||||
) -> Result<(TimelineOrOffloaded, DeletionGuard), DeleteTimelineError> {
|
||||
// Note the interaction between this guard and deletion guard.
|
||||
// Here we attempt to lock deletion guard when we're holding a lock on timelines.
|
||||
@@ -392,10 +389,8 @@ impl DeleteTimelineFlow {
|
||||
}
|
||||
};
|
||||
|
||||
if set_stopping {
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
}
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
}
|
||||
|
||||
Ok((timeline, delete_lock_guard))
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_api::models::{TenantState, TimelineState};
|
||||
use pageserver_api::models::TenantState;
|
||||
|
||||
use super::delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard};
|
||||
use super::Timeline;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::ShutdownIfArchivedError;
|
||||
use crate::tenant::{OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@@ -37,29 +36,28 @@ pub(crate) async fn offload_timeline(
|
||||
tracing::info!("offloading archived timeline");
|
||||
|
||||
let allow_offloaded_children = true;
|
||||
let set_stopping = false;
|
||||
let (timeline, guard) = DeleteTimelineFlow::prepare(
|
||||
tenant,
|
||||
timeline.timeline_id,
|
||||
allow_offloaded_children,
|
||||
set_stopping,
|
||||
)
|
||||
.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
|
||||
let (timeline, guard) =
|
||||
DeleteTimelineFlow::prepare(tenant, timeline.timeline_id, allow_offloaded_children)
|
||||
.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
|
||||
tracing::error!("timeline already offloaded, but given timeline object");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
match timeline.remote_client.shutdown_if_archived().await {
|
||||
Ok(()) => {}
|
||||
Err(ShutdownIfArchivedError::NotInitialized(_)) => {
|
||||
// Either the timeline is being deleted, the operation is being retried, or we are shutting down.
|
||||
// Don't return cancelled here to keep it idempotent.
|
||||
let is_archived = timeline.is_archived();
|
||||
match is_archived {
|
||||
Some(true) => (),
|
||||
Some(false) => {
|
||||
tracing::warn!("tried offloading a non-archived timeline");
|
||||
return Err(OffloadError::NotArchived);
|
||||
}
|
||||
None => {
|
||||
// This is legal: calls to this function can race with the timeline shutting down
|
||||
tracing::info!("tried offloading a timeline whose remote storage is not initialized");
|
||||
return Err(OffloadError::Cancelled);
|
||||
}
|
||||
Err(ShutdownIfArchivedError::NotArchived) => return Err(OffloadError::NotArchived),
|
||||
}
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
timeline.shutdown(super::ShutdownMode::Reload).await;
|
||||
|
||||
26
pgxn/hnsw/Makefile
Normal file
26
pgxn/hnsw/Makefile
Normal file
@@ -0,0 +1,26 @@
|
||||
EXTENSION = hnsw
|
||||
EXTVERSION = 0.1.0
|
||||
|
||||
MODULE_big = hnsw
|
||||
DATA = $(wildcard *--*.sql)
|
||||
OBJS = hnsw.o hnswalg.o
|
||||
|
||||
TESTS = $(wildcard test/sql/*.sql)
|
||||
REGRESS = $(patsubst test/sql/%.sql,%,$(TESTS))
|
||||
REGRESS_OPTS = --inputdir=test --load-extension=hnsw
|
||||
|
||||
# For auto-vectorization:
|
||||
# - GCC (needs -ftree-vectorize OR -O3) - https://gcc.gnu.org/projects/tree-ssa/vectorization.html
|
||||
PG_CFLAGS += -O3
|
||||
PG_CXXFLAGS += -O3 -std=c++11
|
||||
PG_LDFLAGS += -lstdc++
|
||||
|
||||
all: $(EXTENSION)--$(EXTVERSION).sql
|
||||
|
||||
PG_CONFIG ?= pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
|
||||
dist:
|
||||
mkdir -p dist
|
||||
git archive --format zip --prefix=$(EXTENSION)-$(EXTVERSION)/ --output dist/$(EXTENSION)-$(EXTVERSION).zip master
|
||||
25
pgxn/hnsw/README.md
Normal file
25
pgxn/hnsw/README.md
Normal file
@@ -0,0 +1,25 @@
|
||||
# Revisiting the Inverted Indices for Billion-Scale Approximate Nearest Neighbors
|
||||
|
||||
This ANN extension of Postgres is based
|
||||
on [ivf-hnsw](https://github.com/dbaranchuk/ivf-hnsw.git) implementation of [HNSW](https://www.pinecone.io/learn/hnsw),
|
||||
the code for the current state-of-the-art billion-scale nearest neighbor search system presented in the paper:
|
||||
|
||||
[Revisiting the Inverted Indices for Billion-Scale Approximate Nearest Neighbors](http://openaccess.thecvf.com/content_ECCV_2018/html/Dmitry_Baranchuk_Revisiting_the_Inverted_ECCV_2018_paper.html),
|
||||
<br>
|
||||
Dmitry Baranchuk, Artem Babenko, Yury Malkov
|
||||
|
||||
# Postgres extension
|
||||
|
||||
HNSW index is hold in memory (built on demand) and it's maxial size is limited
|
||||
by `maxelements` index parameter. Another required parameter is nubmer of dimensions (if it is not specified in column type).
|
||||
Optional parameter `ef` specifies number of neighbors which are considered during index construction and search (corresponds `efConstruction` and `efSearch` parameters
|
||||
described in the article).
|
||||
|
||||
# Example of usage:
|
||||
|
||||
```
|
||||
create extension hnsw;
|
||||
create table embeddings(id integer primary key, payload real[]);
|
||||
create index on embeddings using hnsw(payload) with (maxelements=1000000, dims=100, m=32);
|
||||
select id from embeddings order by payload <-> array[1.0, 2.0,...] limit 100;
|
||||
```
|
||||
29
pgxn/hnsw/hnsw--0.1.0.sql
Normal file
29
pgxn/hnsw/hnsw--0.1.0.sql
Normal file
@@ -0,0 +1,29 @@
|
||||
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||
\echo Use "CREATE EXTENSION hnsw" to load this file. \quit
|
||||
|
||||
-- functions
|
||||
|
||||
CREATE FUNCTION l2_distance(real[], real[]) RETURNS real
|
||||
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
|
||||
|
||||
-- operators
|
||||
|
||||
CREATE OPERATOR <-> (
|
||||
LEFTARG = real[], RIGHTARG = real[], PROCEDURE = l2_distance,
|
||||
COMMUTATOR = '<->'
|
||||
);
|
||||
|
||||
-- access method
|
||||
|
||||
CREATE FUNCTION hnsw_handler(internal) RETURNS index_am_handler
|
||||
AS 'MODULE_PATHNAME' LANGUAGE C;
|
||||
|
||||
CREATE ACCESS METHOD hnsw TYPE INDEX HANDLER hnsw_handler;
|
||||
|
||||
COMMENT ON ACCESS METHOD hnsw IS 'hnsw index access method';
|
||||
|
||||
-- opclasses
|
||||
|
||||
CREATE OPERATOR CLASS knn_ops
|
||||
DEFAULT FOR TYPE real[] USING hnsw AS
|
||||
OPERATOR 1 <-> (real[], real[]) FOR ORDER BY float_ops;
|
||||
590
pgxn/hnsw/hnsw.c
Normal file
590
pgxn/hnsw/hnsw.c
Normal file
@@ -0,0 +1,590 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/amapi.h"
|
||||
#include "access/generic_xlog.h"
|
||||
#include "access/relation.h"
|
||||
#include "access/reloptions.h"
|
||||
#include "access/tableam.h"
|
||||
#include "catalog/index.h"
|
||||
#include "commands/vacuum.h"
|
||||
#include "nodes/execnodes.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/selfuncs.h"
|
||||
|
||||
#include <math.h>
|
||||
#include <float.h>
|
||||
|
||||
#include "hnsw.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
typedef struct {
|
||||
int32 vl_len_; /* varlena header (do not touch directly!) */
|
||||
int dims;
|
||||
int maxelements;
|
||||
int efConstruction;
|
||||
int efSearch;
|
||||
int M;
|
||||
} HnswOptions;
|
||||
|
||||
static relopt_kind hnsw_relopt_kind;
|
||||
|
||||
typedef struct {
|
||||
HierarchicalNSW* hnsw;
|
||||
size_t curr;
|
||||
size_t n_results;
|
||||
ItemPointer results;
|
||||
} HnswScanOpaqueData;
|
||||
|
||||
typedef HnswScanOpaqueData* HnswScanOpaque;
|
||||
|
||||
typedef struct {
|
||||
Oid relid;
|
||||
uint32 status;
|
||||
HierarchicalNSW* hnsw;
|
||||
} HnswHashEntry;
|
||||
|
||||
|
||||
#define SH_PREFIX hnsw_index
|
||||
#define SH_ELEMENT_TYPE HnswHashEntry
|
||||
#define SH_KEY_TYPE Oid
|
||||
#define SH_KEY relid
|
||||
#define SH_STORE_HASH
|
||||
#define SH_GET_HASH(tb, a) ((a)->relid)
|
||||
#define SH_HASH_KEY(tb, key) (key)
|
||||
#define SH_EQUAL(tb, a, b) ((a) == (b))
|
||||
#define SH_SCOPE static inline
|
||||
#define SH_DEFINE
|
||||
#define SH_DECLARE
|
||||
#include "lib/simplehash.h"
|
||||
|
||||
#define INDEX_HASH_SIZE 11
|
||||
|
||||
#define DEFAULT_EF_SEARCH 64
|
||||
|
||||
PGDLLEXPORT void _PG_init(void);
|
||||
|
||||
static hnsw_index_hash *hnsw_indexes;
|
||||
|
||||
/*
|
||||
* Initialize index options and variables
|
||||
*/
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
hnsw_relopt_kind = add_reloption_kind();
|
||||
add_int_reloption(hnsw_relopt_kind, "dims", "Number of dimensions",
|
||||
0, 0, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "maxelements", "Maximal number of elements",
|
||||
0, 0, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "m", "Number of neighbors of each vertex",
|
||||
100, 0, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "efconstruction", "Number of inspected neighbors during index construction",
|
||||
16, 1, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "efsearch", "Number of inspected neighbors during index search",
|
||||
64, 1, INT_MAX, AccessExclusiveLock);
|
||||
hnsw_indexes = hnsw_index_create(TopMemoryContext, INDEX_HASH_SIZE, NULL);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
hnsw_build_callback(Relation index, ItemPointer tid, Datum *values,
|
||||
bool *isnull, bool tupleIsAlive, void *state)
|
||||
{
|
||||
HierarchicalNSW* hnsw = (HierarchicalNSW*) state;
|
||||
ArrayType* array;
|
||||
int n_items;
|
||||
label_t label = 0;
|
||||
|
||||
/* Skip nulls */
|
||||
if (isnull[0])
|
||||
return;
|
||||
|
||||
array = DatumGetArrayTypeP(values[0]);
|
||||
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
|
||||
if (n_items != hnsw_dimensions(hnsw))
|
||||
{
|
||||
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
|
||||
n_items, hnsw_dimensions(hnsw));
|
||||
}
|
||||
|
||||
memcpy(&label, tid, sizeof(*tid));
|
||||
hnsw_add_point(hnsw, (coord_t*)ARR_DATA_PTR(array), label);
|
||||
}
|
||||
|
||||
static void
|
||||
hnsw_populate(HierarchicalNSW* hnsw, Relation indexRel, Relation heapRel)
|
||||
{
|
||||
IndexInfo* indexInfo = BuildIndexInfo(indexRel);
|
||||
Assert(indexInfo->ii_NumIndexAttrs == 1);
|
||||
table_index_build_scan(heapRel, indexRel, indexInfo,
|
||||
true, true, hnsw_build_callback, (void *) hnsw, NULL);
|
||||
}
|
||||
|
||||
#ifdef __APPLE__
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/sysctl.h>
|
||||
|
||||
static void
|
||||
hnsw_check_available_memory(Size requested)
|
||||
{
|
||||
size_t total;
|
||||
if (sysctlbyname("hw.memsize", NULL, &total, NULL, 0) < 0)
|
||||
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||
|
||||
if ((Size)NBuffers*BLCKSZ + requested >= total)
|
||||
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
|
||||
requested, total - (Size)NBuffers*BLCKSZ);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
#include <sys/sysinfo.h>
|
||||
|
||||
static void
|
||||
hnsw_check_available_memory(Size requested)
|
||||
{
|
||||
struct sysinfo si;
|
||||
Size total;
|
||||
if (sysinfo(&si) < 0)
|
||||
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||
|
||||
total = si.totalram*si.mem_unit;
|
||||
if ((Size)NBuffers*BLCKSZ + requested >= total)
|
||||
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
|
||||
requested, total - (Size)NBuffers*BLCKSZ);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
static HierarchicalNSW*
|
||||
hnsw_get_index(Relation indexRel, Relation heapRel)
|
||||
{
|
||||
HierarchicalNSW* hnsw;
|
||||
Oid indexoid = RelationGetRelid(indexRel);
|
||||
HnswHashEntry* entry = hnsw_index_lookup(hnsw_indexes, indexoid);
|
||||
if (entry == NULL)
|
||||
{
|
||||
size_t dims, maxelements;
|
||||
size_t M;
|
||||
size_t maxM;
|
||||
size_t size_links_level0;
|
||||
size_t size_data_per_element;
|
||||
size_t data_size;
|
||||
dsm_handle handle = indexoid << 1; /* make it even */
|
||||
void* impl_private = NULL;
|
||||
void* mapped_address = NULL;
|
||||
Size mapped_size = 0;
|
||||
Size shmem_size;
|
||||
bool exists = true;
|
||||
bool found;
|
||||
HnswOptions *opts = (HnswOptions *) indexRel->rd_options;
|
||||
if (opts == NULL || opts->maxelements == 0 || opts->dims == 0) {
|
||||
elog(ERROR, "HNSW index requires 'maxelements' and 'dims' to be specified");
|
||||
}
|
||||
dims = opts->dims;
|
||||
maxelements = opts->maxelements;
|
||||
M = opts->M;
|
||||
maxM = M * 2;
|
||||
data_size = dims * sizeof(coord_t);
|
||||
size_links_level0 = (maxM + 1) * sizeof(idx_t);
|
||||
size_data_per_element = size_links_level0 + data_size + sizeof(label_t);
|
||||
shmem_size = hnsw_sizeof() + maxelements * size_data_per_element;
|
||||
|
||||
hnsw_check_available_memory(shmem_size);
|
||||
|
||||
/* first try to attach to existed index */
|
||||
if (!dsm_impl_op(DSM_OP_ATTACH, handle, 0, &impl_private,
|
||||
&mapped_address, &mapped_size, DEBUG1))
|
||||
{
|
||||
/* index doesn't exists: try to create it */
|
||||
if (!dsm_impl_op(DSM_OP_CREATE, handle, shmem_size, &impl_private,
|
||||
&mapped_address, &mapped_size, DEBUG1))
|
||||
{
|
||||
/* We can do it under shared lock, so some other backend may
|
||||
* try to initialize index. If create is failed because index already
|
||||
* created by somebody else, then try to attach to it once again
|
||||
*/
|
||||
if (!dsm_impl_op(DSM_OP_ATTACH, handle, 0, &impl_private,
|
||||
&mapped_address, &mapped_size, ERROR))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
exists = false;
|
||||
}
|
||||
}
|
||||
Assert(mapped_size == shmem_size);
|
||||
hnsw = (HierarchicalNSW*)mapped_address;
|
||||
|
||||
if (!exists)
|
||||
{
|
||||
hnsw_init(hnsw, dims, maxelements, M, maxM, opts->efConstruction);
|
||||
hnsw_populate(hnsw, indexRel, heapRel);
|
||||
}
|
||||
entry = hnsw_index_insert(hnsw_indexes, indexoid, &found);
|
||||
Assert(!found);
|
||||
entry->hnsw = hnsw;
|
||||
}
|
||||
else
|
||||
{
|
||||
hnsw = entry->hnsw;
|
||||
}
|
||||
return hnsw;
|
||||
}
|
||||
|
||||
/*
|
||||
* Start or restart an index scan
|
||||
*/
|
||||
static IndexScanDesc
|
||||
hnsw_beginscan(Relation index, int nkeys, int norderbys)
|
||||
{
|
||||
IndexScanDesc scan = RelationGetIndexScan(index, nkeys, norderbys);
|
||||
HnswScanOpaque so = (HnswScanOpaque) palloc(sizeof(HnswScanOpaqueData));
|
||||
Relation heap = relation_open(index->rd_index->indrelid, NoLock);
|
||||
so->hnsw = hnsw_get_index(index, heap);
|
||||
relation_close(heap, NoLock);
|
||||
so->curr = 0;
|
||||
so->n_results = 0;
|
||||
so->results = NULL;
|
||||
scan->opaque = so;
|
||||
return scan;
|
||||
}
|
||||
|
||||
/*
|
||||
* Start or restart an index scan
|
||||
*/
|
||||
static void
|
||||
hnsw_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
|
||||
{
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
if (so->results)
|
||||
{
|
||||
pfree(so->results);
|
||||
so->results = NULL;
|
||||
}
|
||||
so->curr = 0;
|
||||
if (orderbys && scan->numberOfOrderBys > 0)
|
||||
memmove(scan->orderByData, orderbys, scan->numberOfOrderBys * sizeof(ScanKeyData));
|
||||
}
|
||||
|
||||
/*
|
||||
* Fetch the next tuple in the given scan
|
||||
*/
|
||||
static bool
|
||||
hnsw_gettuple(IndexScanDesc scan, ScanDirection dir)
|
||||
{
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
|
||||
/*
|
||||
* Index can be used to scan backward, but Postgres doesn't support
|
||||
* backward scan on operators
|
||||
*/
|
||||
Assert(ScanDirectionIsForward(dir));
|
||||
|
||||
if (so->curr == 0)
|
||||
{
|
||||
Datum value;
|
||||
ArrayType* array;
|
||||
int n_items;
|
||||
size_t n_results;
|
||||
label_t* results;
|
||||
HnswOptions *opts = (HnswOptions *) scan->indexRelation->rd_options;
|
||||
size_t efSearch = opts ? opts->efSearch : DEFAULT_EF_SEARCH;
|
||||
|
||||
/* Safety check */
|
||||
if (scan->orderByData == NULL)
|
||||
elog(ERROR, "cannot scan HNSW index without order");
|
||||
|
||||
/* No items will match if null */
|
||||
if (scan->orderByData->sk_flags & SK_ISNULL)
|
||||
return false;
|
||||
|
||||
value = scan->orderByData->sk_argument;
|
||||
array = DatumGetArrayTypeP(value);
|
||||
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
|
||||
if (n_items != hnsw_dimensions(so->hnsw))
|
||||
{
|
||||
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
|
||||
n_items, hnsw_dimensions(so->hnsw));
|
||||
}
|
||||
|
||||
if (!hnsw_search(so->hnsw, (coord_t*)ARR_DATA_PTR(array), efSearch, &n_results, &results))
|
||||
elog(ERROR, "HNSW index search failed");
|
||||
so->results = (ItemPointer)palloc(n_results*sizeof(ItemPointerData));
|
||||
so->n_results = n_results;
|
||||
for (size_t i = 0; i < n_results; i++)
|
||||
{
|
||||
memcpy(&so->results[i], &results[i], sizeof(so->results[i]));
|
||||
}
|
||||
free(results);
|
||||
}
|
||||
if (so->curr >= so->n_results)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
scan->xs_heaptid = so->results[so->curr++];
|
||||
scan->xs_recheckorderby = false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* End a scan and release resources
|
||||
*/
|
||||
static void
|
||||
hnsw_endscan(IndexScanDesc scan)
|
||||
{
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
if (so->results)
|
||||
pfree(so->results);
|
||||
pfree(so);
|
||||
scan->opaque = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Estimate the cost of an index scan
|
||||
*/
|
||||
static void
|
||||
hnsw_costestimate(PlannerInfo *root, IndexPath *path, double loop_count,
|
||||
Cost *indexStartupCost, Cost *indexTotalCost,
|
||||
Selectivity *indexSelectivity, double *indexCorrelation
|
||||
,double *indexPages
|
||||
)
|
||||
{
|
||||
GenericCosts costs;
|
||||
|
||||
/* Never use index without order */
|
||||
if (path->indexorderbys == NULL)
|
||||
{
|
||||
*indexStartupCost = DBL_MAX;
|
||||
*indexTotalCost = DBL_MAX;
|
||||
*indexSelectivity = 0;
|
||||
*indexCorrelation = 0;
|
||||
*indexPages = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
MemSet(&costs, 0, sizeof(costs));
|
||||
|
||||
genericcostestimate(root, path, loop_count, &costs);
|
||||
|
||||
/* Startup cost and total cost are same */
|
||||
*indexStartupCost = costs.indexTotalCost;
|
||||
*indexTotalCost = costs.indexTotalCost;
|
||||
*indexSelectivity = costs.indexSelectivity;
|
||||
*indexCorrelation = costs.indexCorrelation;
|
||||
*indexPages = costs.numIndexPages;
|
||||
}
|
||||
|
||||
/*
|
||||
* Parse and validate the reloptions
|
||||
*/
|
||||
static bytea *
|
||||
hnsw_options(Datum reloptions, bool validate)
|
||||
{
|
||||
static const relopt_parse_elt tab[] = {
|
||||
{"dims", RELOPT_TYPE_INT, offsetof(HnswOptions, dims)},
|
||||
{"maxelements", RELOPT_TYPE_INT, offsetof(HnswOptions, maxelements)},
|
||||
{"efconstruction", RELOPT_TYPE_INT, offsetof(HnswOptions, efConstruction)},
|
||||
{"efsearch", RELOPT_TYPE_INT, offsetof(HnswOptions, efSearch)},
|
||||
{"m", RELOPT_TYPE_INT, offsetof(HnswOptions, M)}
|
||||
};
|
||||
|
||||
return (bytea *) build_reloptions(reloptions, validate,
|
||||
hnsw_relopt_kind,
|
||||
sizeof(HnswOptions),
|
||||
tab, lengthof(tab));
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate catalog entries for the specified operator class
|
||||
*/
|
||||
static bool
|
||||
hnsw_validate(Oid opclassoid)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Build the index for a logged table
|
||||
*/
|
||||
static IndexBuildResult *
|
||||
hnsw_build(Relation heap, Relation index, IndexInfo *indexInfo)
|
||||
{
|
||||
HierarchicalNSW* hnsw = hnsw_get_index(index, heap);
|
||||
IndexBuildResult* result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
|
||||
result->heap_tuples = result->index_tuples = hnsw_count(hnsw);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Insert a tuple into the index
|
||||
*/
|
||||
static bool
|
||||
hnsw_insert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid,
|
||||
Relation heap, IndexUniqueCheck checkUnique,
|
||||
bool indexUnchanged,
|
||||
IndexInfo *indexInfo)
|
||||
{
|
||||
HierarchicalNSW* hnsw = hnsw_get_index(index, heap);
|
||||
Datum value;
|
||||
ArrayType* array;
|
||||
int n_items;
|
||||
label_t label = 0;
|
||||
|
||||
/* Skip nulls */
|
||||
if (isnull[0])
|
||||
return false;
|
||||
|
||||
/* Detoast value */
|
||||
value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
|
||||
array = DatumGetArrayTypeP(value);
|
||||
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
|
||||
if (n_items != hnsw_dimensions(hnsw))
|
||||
{
|
||||
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
|
||||
n_items, hnsw_dimensions(hnsw));
|
||||
}
|
||||
memcpy(&label, heap_tid, sizeof(*heap_tid));
|
||||
if (!hnsw_add_point(hnsw, (coord_t*)ARR_DATA_PTR(array), label))
|
||||
elog(ERROR, "HNSW index insert failed");
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Build the index for an unlogged table
|
||||
*/
|
||||
static void
|
||||
hnsw_buildempty(Relation index)
|
||||
{
|
||||
/* index will be constructed on dema nd when accessed */
|
||||
}
|
||||
|
||||
/*
|
||||
* Clean up after a VACUUM operation
|
||||
*/
|
||||
static IndexBulkDeleteResult *
|
||||
hnsw_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
|
||||
{
|
||||
Relation rel = info->index;
|
||||
|
||||
if (stats == NULL)
|
||||
return NULL;
|
||||
|
||||
stats->num_pages = RelationGetNumberOfBlocks(rel);
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/*
|
||||
* Bulk delete tuples from the index
|
||||
*/
|
||||
static IndexBulkDeleteResult *
|
||||
hnsw_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
|
||||
IndexBulkDeleteCallback callback, void *callback_state)
|
||||
{
|
||||
if (stats == NULL)
|
||||
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
|
||||
return stats;
|
||||
}
|
||||
|
||||
/*
|
||||
* Define index handler
|
||||
*
|
||||
* See https://www.postgresql.org/docs/current/index-api.html
|
||||
*/
|
||||
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnsw_handler);
|
||||
Datum
|
||||
hnsw_handler(PG_FUNCTION_ARGS)
|
||||
{
|
||||
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
|
||||
|
||||
amroutine->amstrategies = 0;
|
||||
amroutine->amsupport = 0;
|
||||
amroutine->amoptsprocnum = 0;
|
||||
amroutine->amcanorder = false;
|
||||
amroutine->amcanorderbyop = true;
|
||||
amroutine->amcanbackward = false; /* can change direction mid-scan */
|
||||
amroutine->amcanunique = false;
|
||||
amroutine->amcanmulticol = false;
|
||||
amroutine->amoptionalkey = true;
|
||||
amroutine->amsearcharray = false;
|
||||
amroutine->amsearchnulls = false;
|
||||
amroutine->amstorage = false;
|
||||
amroutine->amclusterable = false;
|
||||
amroutine->ampredlocks = false;
|
||||
amroutine->amcanparallel = false;
|
||||
amroutine->amcaninclude = false;
|
||||
amroutine->amusemaintenanceworkmem = false; /* not used during VACUUM */
|
||||
amroutine->amparallelvacuumoptions = VACUUM_OPTION_PARALLEL_BULKDEL;
|
||||
amroutine->amkeytype = InvalidOid;
|
||||
|
||||
/* Interface functions */
|
||||
amroutine->ambuild = hnsw_build;
|
||||
amroutine->ambuildempty = hnsw_buildempty;
|
||||
amroutine->aminsert = hnsw_insert;
|
||||
amroutine->ambulkdelete = hnsw_bulkdelete;
|
||||
amroutine->amvacuumcleanup = hnsw_vacuumcleanup;
|
||||
amroutine->amcanreturn = NULL; /* tuple not included in heapsort */
|
||||
amroutine->amcostestimate = hnsw_costestimate;
|
||||
amroutine->amoptions = hnsw_options;
|
||||
amroutine->amproperty = NULL; /* TODO AMPROP_DISTANCE_ORDERABLE */
|
||||
amroutine->ambuildphasename = NULL;
|
||||
amroutine->amvalidate = hnsw_validate;
|
||||
amroutine->amadjustmembers = NULL;
|
||||
amroutine->ambeginscan = hnsw_beginscan;
|
||||
amroutine->amrescan = hnsw_rescan;
|
||||
amroutine->amgettuple = hnsw_gettuple;
|
||||
amroutine->amgetbitmap = NULL;
|
||||
amroutine->amendscan = hnsw_endscan;
|
||||
amroutine->ammarkpos = NULL;
|
||||
amroutine->amrestrpos = NULL;
|
||||
|
||||
/* Interface functions to support parallel index scans */
|
||||
amroutine->amestimateparallelscan = NULL;
|
||||
amroutine->aminitparallelscan = NULL;
|
||||
amroutine->amparallelrescan = NULL;
|
||||
|
||||
PG_RETURN_POINTER(amroutine);
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the L2 distance between vectors
|
||||
*/
|
||||
PGDLLEXPORT PG_FUNCTION_INFO_V1(l2_distance);
|
||||
Datum
|
||||
l2_distance(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
|
||||
ArrayType *b = PG_GETARG_ARRAYTYPE_P(1);
|
||||
int a_dim = ArrayGetNItems(ARR_NDIM(a), ARR_DIMS(a));
|
||||
int b_dim = ArrayGetNItems(ARR_NDIM(b), ARR_DIMS(b));
|
||||
dist_t distance = 0.0;
|
||||
dist_t diff;
|
||||
coord_t *ax = (coord_t*)ARR_DATA_PTR(a);
|
||||
coord_t *bx = (coord_t*)ARR_DATA_PTR(b);
|
||||
|
||||
if (a_dim != b_dim)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_EXCEPTION),
|
||||
errmsg("different array dimensions %d and %d", a_dim, b_dim)));
|
||||
}
|
||||
|
||||
for (int i = 0; i < a_dim; i++)
|
||||
{
|
||||
diff = ax[i] - bx[i];
|
||||
distance += diff * diff;
|
||||
}
|
||||
|
||||
PG_RETURN_FLOAT4((dist_t)sqrt(distance));
|
||||
}
|
||||
4
pgxn/hnsw/hnsw.control
Normal file
4
pgxn/hnsw/hnsw.control
Normal file
@@ -0,0 +1,4 @@
|
||||
comment = '** Deprecated ** Please use pg_embedding instead'
|
||||
default_version = '0.1.0'
|
||||
module_pathname = '$libdir/hnsw'
|
||||
relocatable = true
|
||||
15
pgxn/hnsw/hnsw.h
Normal file
15
pgxn/hnsw/hnsw.h
Normal file
@@ -0,0 +1,15 @@
|
||||
#pragma once
|
||||
|
||||
typedef float coord_t;
|
||||
typedef float dist_t;
|
||||
typedef uint32_t idx_t;
|
||||
typedef uint64_t label_t;
|
||||
|
||||
typedef struct HierarchicalNSW HierarchicalNSW;
|
||||
|
||||
bool hnsw_search(HierarchicalNSW* hnsw, const coord_t *point, size_t efSearch, size_t* n_results, label_t** results);
|
||||
bool hnsw_add_point(HierarchicalNSW* hnsw, const coord_t *point, label_t label);
|
||||
void hnsw_init(HierarchicalNSW* hnsw, size_t dim, size_t maxelements, size_t M, size_t maxM, size_t efConstruction);
|
||||
int hnsw_dimensions(HierarchicalNSW* hnsw);
|
||||
size_t hnsw_count(HierarchicalNSW* hnsw);
|
||||
size_t hnsw_sizeof(void);
|
||||
379
pgxn/hnsw/hnswalg.cpp
Normal file
379
pgxn/hnsw/hnswalg.cpp
Normal file
@@ -0,0 +1,379 @@
|
||||
#include "hnswalg.h"
|
||||
|
||||
#if defined(__GNUC__)
|
||||
#define PORTABLE_ALIGN32 __attribute__((aligned(32)))
|
||||
#define PREFETCH(addr,hint) __builtin_prefetch(addr, 0, hint)
|
||||
#else
|
||||
#define PORTABLE_ALIGN32 __declspec(align(32))
|
||||
#define PREFETCH(addr,hint)
|
||||
#endif
|
||||
|
||||
HierarchicalNSW::HierarchicalNSW(size_t dim_, size_t maxelements_, size_t M_, size_t maxM_, size_t efConstruction_)
|
||||
{
|
||||
dim = dim_;
|
||||
data_size = dim * sizeof(coord_t);
|
||||
|
||||
efConstruction = efConstruction_;
|
||||
|
||||
maxelements = maxelements_;
|
||||
M = M_;
|
||||
maxM = maxM_;
|
||||
size_links_level0 = (maxM + 1) * sizeof(idx_t);
|
||||
size_data_per_element = size_links_level0 + data_size + sizeof(label_t);
|
||||
offset_data = size_links_level0;
|
||||
offset_label = offset_data + data_size;
|
||||
|
||||
enterpoint_node = 0;
|
||||
cur_element_count = 0;
|
||||
#ifdef __x86_64__
|
||||
use_avx2 = __builtin_cpu_supports("avx2");
|
||||
#endif
|
||||
}
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> HierarchicalNSW::searchBaseLayer(const coord_t *point, size_t ef)
|
||||
{
|
||||
std::vector<uint32_t> visited;
|
||||
visited.resize((cur_element_count + 31) >> 5);
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t >> topResults;
|
||||
std::priority_queue<std::pair<dist_t, idx_t >> candidateSet;
|
||||
|
||||
dist_t dist = fstdistfunc(point, getDataByInternalId(enterpoint_node));
|
||||
|
||||
topResults.emplace(dist, enterpoint_node);
|
||||
candidateSet.emplace(-dist, enterpoint_node);
|
||||
visited[enterpoint_node >> 5] = 1 << (enterpoint_node & 31);
|
||||
dist_t lowerBound = dist;
|
||||
|
||||
while (!candidateSet.empty())
|
||||
{
|
||||
std::pair<dist_t, idx_t> curr_el_pair = candidateSet.top();
|
||||
if (-curr_el_pair.first > lowerBound)
|
||||
break;
|
||||
|
||||
candidateSet.pop();
|
||||
idx_t curNodeNum = curr_el_pair.second;
|
||||
|
||||
idx_t* data = get_linklist0(curNodeNum);
|
||||
size_t size = *data++;
|
||||
|
||||
PREFETCH(getDataByInternalId(*data), 0);
|
||||
|
||||
for (size_t j = 0; j < size; ++j) {
|
||||
size_t tnum = *(data + j);
|
||||
|
||||
PREFETCH(getDataByInternalId(*(data + j + 1)), 0);
|
||||
|
||||
if (!(visited[tnum >> 5] & (1 << (tnum & 31)))) {
|
||||
visited[tnum >> 5] |= 1 << (tnum & 31);
|
||||
|
||||
dist = fstdistfunc(point, getDataByInternalId(tnum));
|
||||
|
||||
if (topResults.top().first > dist || topResults.size() < ef) {
|
||||
candidateSet.emplace(-dist, tnum);
|
||||
|
||||
PREFETCH(get_linklist0(candidateSet.top().second), 0);
|
||||
topResults.emplace(dist, tnum);
|
||||
|
||||
if (topResults.size() > ef)
|
||||
topResults.pop();
|
||||
|
||||
lowerBound = topResults.top().first;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return topResults;
|
||||
}
|
||||
|
||||
|
||||
void HierarchicalNSW::getNeighborsByHeuristic(std::priority_queue<std::pair<dist_t, idx_t>> &topResults, size_t NN)
|
||||
{
|
||||
if (topResults.size() < NN)
|
||||
return;
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> resultSet;
|
||||
std::vector<std::pair<dist_t, idx_t>> returnlist;
|
||||
|
||||
while (topResults.size() > 0) {
|
||||
resultSet.emplace(-topResults.top().first, topResults.top().second);
|
||||
topResults.pop();
|
||||
}
|
||||
|
||||
while (resultSet.size()) {
|
||||
if (returnlist.size() >= NN)
|
||||
break;
|
||||
std::pair<dist_t, idx_t> curen = resultSet.top();
|
||||
dist_t dist_to_query = -curen.first;
|
||||
resultSet.pop();
|
||||
bool good = true;
|
||||
for (std::pair<dist_t, idx_t> curen2 : returnlist) {
|
||||
dist_t curdist = fstdistfunc(getDataByInternalId(curen2.second),
|
||||
getDataByInternalId(curen.second));
|
||||
if (curdist < dist_to_query) {
|
||||
good = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (good) returnlist.push_back(curen);
|
||||
}
|
||||
for (std::pair<dist_t, idx_t> elem : returnlist)
|
||||
topResults.emplace(-elem.first, elem.second);
|
||||
}
|
||||
|
||||
void HierarchicalNSW::mutuallyConnectNewElement(const coord_t *point, idx_t cur_c,
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> topResults)
|
||||
{
|
||||
getNeighborsByHeuristic(topResults, M);
|
||||
|
||||
std::vector<idx_t> res;
|
||||
res.reserve(M);
|
||||
while (topResults.size() > 0) {
|
||||
res.push_back(topResults.top().second);
|
||||
topResults.pop();
|
||||
}
|
||||
{
|
||||
idx_t* data = get_linklist0(cur_c);
|
||||
if (*data)
|
||||
throw std::runtime_error("Should be blank");
|
||||
|
||||
*data++ = res.size();
|
||||
|
||||
for (size_t idx = 0; idx < res.size(); idx++) {
|
||||
if (data[idx])
|
||||
throw std::runtime_error("Should be blank");
|
||||
data[idx] = res[idx];
|
||||
}
|
||||
}
|
||||
for (size_t idx = 0; idx < res.size(); idx++) {
|
||||
if (res[idx] == cur_c)
|
||||
throw std::runtime_error("Connection to the same element");
|
||||
|
||||
size_t resMmax = maxM;
|
||||
idx_t *ll_other = get_linklist0(res[idx]);
|
||||
idx_t sz_link_list_other = *ll_other;
|
||||
|
||||
if (sz_link_list_other > resMmax || sz_link_list_other < 0)
|
||||
throw std::runtime_error("Bad sz_link_list_other");
|
||||
|
||||
if (sz_link_list_other < resMmax) {
|
||||
idx_t *data = ll_other + 1;
|
||||
data[sz_link_list_other] = cur_c;
|
||||
*ll_other = sz_link_list_other + 1;
|
||||
} else {
|
||||
// finding the "weakest" element to replace it with the new one
|
||||
idx_t *data = ll_other + 1;
|
||||
dist_t d_max = fstdistfunc(getDataByInternalId(cur_c), getDataByInternalId(res[idx]));
|
||||
// Heuristic:
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> candidates;
|
||||
candidates.emplace(d_max, cur_c);
|
||||
|
||||
for (size_t j = 0; j < sz_link_list_other; j++)
|
||||
candidates.emplace(fstdistfunc(getDataByInternalId(data[j]), getDataByInternalId(res[idx])), data[j]);
|
||||
|
||||
getNeighborsByHeuristic(candidates, resMmax);
|
||||
|
||||
size_t indx = 0;
|
||||
while (!candidates.empty()) {
|
||||
data[indx] = candidates.top().second;
|
||||
candidates.pop();
|
||||
indx++;
|
||||
}
|
||||
*ll_other = indx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void HierarchicalNSW::addPoint(const coord_t *point, label_t label)
|
||||
{
|
||||
if (cur_element_count >= maxelements) {
|
||||
throw std::runtime_error("The number of elements exceeds the specified limit");
|
||||
}
|
||||
idx_t cur_c = cur_element_count++;
|
||||
memset((char *) get_linklist0(cur_c), 0, size_data_per_element);
|
||||
memcpy(getDataByInternalId(cur_c), point, data_size);
|
||||
memcpy(getExternalLabel(cur_c), &label, sizeof label);
|
||||
|
||||
// Do nothing for the first element
|
||||
if (cur_c != 0) {
|
||||
std::priority_queue <std::pair<dist_t, idx_t>> topResults = searchBaseLayer(point, efConstruction);
|
||||
mutuallyConnectNewElement(point, cur_c, topResults);
|
||||
}
|
||||
};
|
||||
|
||||
std::priority_queue<std::pair<dist_t, label_t>> HierarchicalNSW::searchKnn(const coord_t *query, size_t k)
|
||||
{
|
||||
std::priority_queue<std::pair<dist_t, label_t>> topResults;
|
||||
auto topCandidates = searchBaseLayer(query, k);
|
||||
while (topCandidates.size() > k) {
|
||||
topCandidates.pop();
|
||||
}
|
||||
while (!topCandidates.empty()) {
|
||||
std::pair<dist_t, idx_t> rez = topCandidates.top();
|
||||
label_t label;
|
||||
memcpy(&label, getExternalLabel(rez.second), sizeof(label));
|
||||
topResults.push(std::pair<dist_t, label_t>(rez.first, label));
|
||||
topCandidates.pop();
|
||||
}
|
||||
|
||||
return topResults;
|
||||
};
|
||||
|
||||
dist_t fstdistfunc_scalar(const coord_t *x, const coord_t *y, size_t n)
|
||||
{
|
||||
dist_t distance = 0.0;
|
||||
|
||||
for (size_t i = 0; i < n; i++)
|
||||
{
|
||||
dist_t diff = x[i] - y[i];
|
||||
distance += diff * diff;
|
||||
}
|
||||
return distance;
|
||||
|
||||
}
|
||||
|
||||
#ifdef __x86_64__
|
||||
#include <immintrin.h>
|
||||
|
||||
__attribute__((target("avx2")))
|
||||
dist_t fstdistfunc_avx2(const coord_t *x, const coord_t *y, size_t n)
|
||||
{
|
||||
const size_t TmpResSz = sizeof(__m256) / sizeof(float);
|
||||
float PORTABLE_ALIGN32 TmpRes[TmpResSz];
|
||||
size_t qty16 = n / 16;
|
||||
const float *pEnd1 = x + (qty16 * 16);
|
||||
__m256 diff, v1, v2;
|
||||
__m256 sum = _mm256_set1_ps(0);
|
||||
|
||||
while (x < pEnd1) {
|
||||
v1 = _mm256_loadu_ps(x);
|
||||
x += 8;
|
||||
v2 = _mm256_loadu_ps(y);
|
||||
y += 8;
|
||||
diff = _mm256_sub_ps(v1, v2);
|
||||
sum = _mm256_add_ps(sum, _mm256_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm256_loadu_ps(x);
|
||||
x += 8;
|
||||
v2 = _mm256_loadu_ps(y);
|
||||
y += 8;
|
||||
diff = _mm256_sub_ps(v1, v2);
|
||||
sum = _mm256_add_ps(sum, _mm256_mul_ps(diff, diff));
|
||||
}
|
||||
_mm256_store_ps(TmpRes, sum);
|
||||
float res = TmpRes[0] + TmpRes[1] + TmpRes[2] + TmpRes[3] + TmpRes[4] + TmpRes[5] + TmpRes[6] + TmpRes[7];
|
||||
return (res);
|
||||
}
|
||||
|
||||
dist_t fstdistfunc_sse(const coord_t *x, const coord_t *y, size_t n)
|
||||
{
|
||||
const size_t TmpResSz = sizeof(__m128) / sizeof(float);
|
||||
float PORTABLE_ALIGN32 TmpRes[TmpResSz];
|
||||
size_t qty16 = n / 16;
|
||||
const float *pEnd1 = x + (qty16 * 16);
|
||||
|
||||
__m128 diff, v1, v2;
|
||||
__m128 sum = _mm_set1_ps(0);
|
||||
|
||||
while (x < pEnd1) {
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
}
|
||||
_mm_store_ps(TmpRes, sum);
|
||||
float res = TmpRes[0] + TmpRes[1] + TmpRes[2] + TmpRes[3];
|
||||
return res;
|
||||
}
|
||||
#endif
|
||||
|
||||
dist_t HierarchicalNSW::fstdistfunc(const coord_t *x, const coord_t *y)
|
||||
{
|
||||
#ifndef __x86_64__
|
||||
return fstdistfunc_scalar(x, y, dim);
|
||||
#else
|
||||
if(use_avx2)
|
||||
return fstdistfunc_avx2(x, y, dim);
|
||||
|
||||
return fstdistfunc_sse(x, y, dim);
|
||||
#endif
|
||||
}
|
||||
|
||||
bool hnsw_search(HierarchicalNSW* hnsw, const coord_t *point, size_t efSearch, size_t* n_results, label_t** results)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto result = hnsw->searchKnn(point, efSearch);
|
||||
size_t nResults = result.size();
|
||||
*results = (label_t*)malloc(nResults*sizeof(label_t));
|
||||
for (size_t i = nResults; i-- != 0;)
|
||||
{
|
||||
(*results)[i] = result.top().second;
|
||||
result.pop();
|
||||
}
|
||||
*n_results = nResults;
|
||||
return true;
|
||||
}
|
||||
catch (std::exception& x)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool hnsw_add_point(HierarchicalNSW* hnsw, const coord_t *point, label_t label)
|
||||
{
|
||||
try
|
||||
{
|
||||
hnsw->addPoint(point, label);
|
||||
return true;
|
||||
}
|
||||
catch (std::exception& x)
|
||||
{
|
||||
fprintf(stderr, "Catch %s\n", x.what());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void hnsw_init(HierarchicalNSW* hnsw, size_t dims, size_t maxelements, size_t M, size_t maxM, size_t efConstruction)
|
||||
{
|
||||
new ((void*)hnsw) HierarchicalNSW(dims, maxelements, M, maxM, efConstruction);
|
||||
}
|
||||
|
||||
|
||||
int hnsw_dimensions(HierarchicalNSW* hnsw)
|
||||
{
|
||||
return (int)hnsw->dim;
|
||||
}
|
||||
|
||||
size_t hnsw_count(HierarchicalNSW* hnsw)
|
||||
{
|
||||
return hnsw->cur_element_count;
|
||||
}
|
||||
|
||||
size_t hnsw_sizeof(void)
|
||||
{
|
||||
return sizeof(HierarchicalNSW);
|
||||
}
|
||||
69
pgxn/hnsw/hnswalg.h
Normal file
69
pgxn/hnsw/hnswalg.h
Normal file
@@ -0,0 +1,69 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <map>
|
||||
#include <cmath>
|
||||
#include <queue>
|
||||
#include <stdexcept>
|
||||
|
||||
extern "C" {
|
||||
#include "hnsw.h"
|
||||
}
|
||||
|
||||
struct HierarchicalNSW
|
||||
{
|
||||
size_t maxelements;
|
||||
size_t cur_element_count;
|
||||
|
||||
idx_t enterpoint_node;
|
||||
|
||||
size_t dim;
|
||||
size_t data_size;
|
||||
size_t offset_data;
|
||||
size_t offset_label;
|
||||
size_t size_data_per_element;
|
||||
size_t M;
|
||||
size_t maxM;
|
||||
size_t size_links_level0;
|
||||
size_t efConstruction;
|
||||
|
||||
#ifdef __x86_64__
|
||||
bool use_avx2;
|
||||
#endif
|
||||
|
||||
char data_level0_memory[0]; // varying size
|
||||
|
||||
public:
|
||||
HierarchicalNSW(size_t dim, size_t maxelements, size_t M, size_t maxM, size_t efConstruction);
|
||||
~HierarchicalNSW();
|
||||
|
||||
|
||||
inline coord_t *getDataByInternalId(idx_t internal_id) const {
|
||||
return (coord_t *)&data_level0_memory[internal_id * size_data_per_element + offset_data];
|
||||
}
|
||||
|
||||
inline idx_t *get_linklist0(idx_t internal_id) const {
|
||||
return (idx_t*)&data_level0_memory[internal_id * size_data_per_element];
|
||||
}
|
||||
|
||||
inline label_t *getExternalLabel(idx_t internal_id) const {
|
||||
return (label_t *)&data_level0_memory[internal_id * size_data_per_element + offset_label];
|
||||
}
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> searchBaseLayer(const coord_t *x, size_t ef);
|
||||
|
||||
void getNeighborsByHeuristic(std::priority_queue<std::pair<dist_t, idx_t>> &topResults, size_t NN);
|
||||
|
||||
void mutuallyConnectNewElement(const coord_t *x, idx_t id, std::priority_queue<std::pair<dist_t, idx_t>> topResults);
|
||||
|
||||
void addPoint(const coord_t *point, label_t label);
|
||||
|
||||
std::priority_queue<std::pair<dist_t, label_t>> searchKnn(const coord_t *query_data, size_t k);
|
||||
|
||||
dist_t fstdistfunc(const coord_t *x, const coord_t *y);
|
||||
};
|
||||
28
pgxn/hnsw/test/expected/knn.out
Normal file
28
pgxn/hnsw/test/expected/knn.out
Normal file
@@ -0,0 +1,28 @@
|
||||
SET enable_seqscan = off;
|
||||
CREATE TABLE t (val real[]);
|
||||
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
|
||||
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
|
||||
INSERT INTO t (val) VALUES (array[1,2,4]);
|
||||
explain SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------
|
||||
Index Scan using t_val_idx on t (cost=4.02..8.06 rows=3 width=36)
|
||||
Order By: (val <-> '{3,3,3}'::real[])
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
val
|
||||
---------
|
||||
{1,2,3}
|
||||
{1,2,4}
|
||||
{1,1,1}
|
||||
{0,0,0}
|
||||
(4 rows)
|
||||
|
||||
SELECT COUNT(*) FROM t;
|
||||
count
|
||||
-------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
DROP TABLE t;
|
||||
13
pgxn/hnsw/test/sql/knn.sql
Normal file
13
pgxn/hnsw/test/sql/knn.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
SET enable_seqscan = off;
|
||||
|
||||
CREATE TABLE t (val real[]);
|
||||
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
|
||||
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
|
||||
|
||||
INSERT INTO t (val) VALUES (array[1,2,4]);
|
||||
|
||||
explain SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
SELECT COUNT(*) FROM t;
|
||||
|
||||
DROP TABLE t;
|
||||
8
poetry.lock
generated
8
poetry.lock
generated
@@ -2028,13 +2028,13 @@ openapi-schema-validator = ">=0.4.2,<0.5.0"
|
||||
|
||||
[[package]]
|
||||
name = "packaging"
|
||||
version = "24.2"
|
||||
version = "23.0"
|
||||
description = "Core utilities for Python packages"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"},
|
||||
{file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"},
|
||||
{file = "packaging-23.0-py3-none-any.whl", hash = "sha256:714ac14496c3e68c99c29b00845f7a2b85f3bb6f1078fd9f72fd20f0570002b2"},
|
||||
{file = "packaging-23.0.tar.gz", hash = "sha256:b6ad297f8907de0fa2fe1ccbd26fdaf387f5f47c7275fedf8cce89f99446cf97"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use std::fmt;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use postgres_client::config::SslMode;
|
||||
use pq_proto::BeMessage as Be;
|
||||
use std::fmt;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, info_span};
|
||||
@@ -13,13 +12,10 @@ use crate::auth::IpPattern;
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::client::cplane_proxy_v1;
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::control_plane::{self, client::cplane_proxy_v1, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::proxy::connect_compute::ComputeConnectBackend;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::stream::PqStream;
|
||||
use crate::types::RoleName;
|
||||
use crate::{auth, compute, waiters};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -109,16 +105,10 @@ impl ConsoleRedirectBackend {
|
||||
ctx: &RequestContext,
|
||||
auth_config: &'static AuthenticationConfig,
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> auth::Result<(
|
||||
ConsoleRedirectNodeInfo,
|
||||
ComputeUserInfo,
|
||||
Option<Vec<IpPattern>>,
|
||||
)> {
|
||||
) -> auth::Result<(ConsoleRedirectNodeInfo, Option<Vec<IpPattern>>)> {
|
||||
authenticate(ctx, auth_config, &self.console_uri, client)
|
||||
.await
|
||||
.map(|(node_info, user_info, ip_allowlist)| {
|
||||
(ConsoleRedirectNodeInfo(node_info), user_info, ip_allowlist)
|
||||
})
|
||||
.map(|(node_info, ip_allowlist)| (ConsoleRedirectNodeInfo(node_info), ip_allowlist))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,7 +133,7 @@ async fn authenticate(
|
||||
auth_config: &'static AuthenticationConfig,
|
||||
link_uri: &reqwest::Url,
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> auth::Result<(NodeInfo, ComputeUserInfo, Option<Vec<IpPattern>>)> {
|
||||
) -> auth::Result<(NodeInfo, Option<Vec<IpPattern>>)> {
|
||||
ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect);
|
||||
|
||||
// registering waiter can fail if we get unlucky with rng.
|
||||
@@ -198,15 +188,8 @@ async fn authenticate(
|
||||
let mut config = compute::ConnCfg::new(db_info.host.to_string(), db_info.port);
|
||||
config.dbname(&db_info.dbname).user(&db_info.user);
|
||||
|
||||
let user: RoleName = db_info.user.into();
|
||||
let user_info = ComputeUserInfo {
|
||||
endpoint: db_info.aux.endpoint_id.as_str().into(),
|
||||
user: user.clone(),
|
||||
options: NeonOptions::default(),
|
||||
};
|
||||
|
||||
ctx.set_dbname(db_info.dbname.into());
|
||||
ctx.set_user(user);
|
||||
ctx.set_user(db_info.user.into());
|
||||
ctx.set_project(db_info.aux.clone());
|
||||
info!("woken up a compute node");
|
||||
|
||||
@@ -229,7 +212,6 @@ async fn authenticate(
|
||||
config,
|
||||
aux: db_info.aux,
|
||||
},
|
||||
user_info,
|
||||
db_info.allowed_ips,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -24,8 +24,10 @@ use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, NumDbConnectionsGuard};
|
||||
use crate::proxy::neon_option;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
use crate::types::Host;
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
|
||||
@@ -251,7 +253,6 @@ impl ConnCfg {
|
||||
ctx: &RequestContext,
|
||||
aux: MetricsAuxInfo,
|
||||
config: &ComputeConfig,
|
||||
user_info: ComputeUserInfo,
|
||||
) -> Result<PostgresConnection, ConnectionError> {
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let (socket_addr, stream, host) = self.connect_raw(config.timeout).await?;
|
||||
@@ -286,6 +287,28 @@ impl ConnCfg {
|
||||
self.0.get_ssl_mode()
|
||||
);
|
||||
|
||||
let compute_info = match parameters.get("user") {
|
||||
Some(user) => {
|
||||
match parameters.get("database") {
|
||||
Some(database) => {
|
||||
ComputeUserInfo {
|
||||
user: RoleName::from(user),
|
||||
options: NeonOptions::default(), // just a shim, we don't need options
|
||||
endpoint: EndpointId::from(database),
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("compute node didn't return database name");
|
||||
ComputeUserInfo::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("compute node didn't return user name");
|
||||
ComputeUserInfo::default()
|
||||
}
|
||||
};
|
||||
|
||||
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
|
||||
// Yet another reason to rework the connection establishing code.
|
||||
let cancel_closure = CancelClosure::new(
|
||||
@@ -298,7 +321,7 @@ impl ConnCfg {
|
||||
},
|
||||
vec![], // TODO: deprecated, will be removed
|
||||
host.to_string(),
|
||||
user_info,
|
||||
compute_info,
|
||||
);
|
||||
|
||||
let connection = PostgresConnection {
|
||||
|
||||
@@ -195,7 +195,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
let (node_info, user_info, ip_allowlist) = match backend
|
||||
let (user_info, ip_allowlist) = match backend
|
||||
.authenticate(ctx, &config.authentication_config, &mut stream)
|
||||
.await
|
||||
{
|
||||
@@ -208,12 +208,11 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let mut node = connect_to_compute(
|
||||
ctx,
|
||||
&TcpMechanism {
|
||||
user_info,
|
||||
params_compat: true,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
},
|
||||
&node_info,
|
||||
&user_info,
|
||||
config.wake_compute_retry_config,
|
||||
&config.connect_to_compute,
|
||||
)
|
||||
|
||||
@@ -74,11 +74,8 @@ impl NodeInfo {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
config: &ComputeConfig,
|
||||
user_info: ComputeUserInfo,
|
||||
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
|
||||
self.config
|
||||
.connect(ctx, self.aux.clone(), config, user_info)
|
||||
.await
|
||||
self.config.connect(ctx, self.aux.clone(), config).await
|
||||
}
|
||||
|
||||
pub(crate) fn reuse_settings(&mut self, other: Self) {
|
||||
|
||||
@@ -4,7 +4,7 @@ use tokio::time;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use super::retry::ShouldRetryWakeCompute;
|
||||
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
|
||||
use crate::auth::backend::ComputeCredentialKeys;
|
||||
use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
|
||||
use crate::config::{ComputeConfig, RetryConfig};
|
||||
use crate::context::RequestContext;
|
||||
@@ -71,8 +71,6 @@ pub(crate) struct TcpMechanism<'a> {
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
pub(crate) locks: &'static ApiLocks<Host>,
|
||||
|
||||
pub(crate) user_info: ComputeUserInfo,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -90,7 +88,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let host = node_info.config.get_host();
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
permit.release_result(node_info.connect(ctx, config, self.user_info.clone()).await)
|
||||
permit.release_result(node_info.connect(ctx, config).await)
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
|
||||
@@ -332,19 +332,16 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
};
|
||||
|
||||
let compute_user_info = match &user_info {
|
||||
auth::Backend::ControlPlane(_, info) => &info.info,
|
||||
auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"),
|
||||
let params_compat = match &user_info {
|
||||
auth::Backend::ControlPlane(_, info) => {
|
||||
info.info.options.get(NeonOptions::PARAMS_COMPAT).is_some()
|
||||
}
|
||||
auth::Backend::Local(_) => false,
|
||||
};
|
||||
let params_compat = compute_user_info
|
||||
.options
|
||||
.get(NeonOptions::PARAMS_COMPAT)
|
||||
.is_some();
|
||||
|
||||
let mut node = connect_to_compute(
|
||||
ctx,
|
||||
&TcpMechanism {
|
||||
user_info: compute_user_info.clone(),
|
||||
params_compat,
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
|
||||
@@ -74,11 +74,7 @@ pub(crate) enum Notification {
|
||||
#[serde(rename = "/cancel_session")]
|
||||
Cancel(CancelSession),
|
||||
|
||||
#[serde(
|
||||
other,
|
||||
deserialize_with = "deserialize_unknown_topic",
|
||||
skip_serializing
|
||||
)]
|
||||
#[serde(other, skip_serializing)]
|
||||
UnknownTopic,
|
||||
}
|
||||
|
||||
@@ -127,15 +123,6 @@ where
|
||||
serde_json::from_str(&s).map_err(<D::Error as serde::de::Error>::custom)
|
||||
}
|
||||
|
||||
// https://github.com/serde-rs/serde/issues/1714
|
||||
fn deserialize_unknown_topic<'de, D>(deserializer: D) -> Result<(), D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_any(serde::de::IgnoredAny)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MessageHandler<C: ProjectInfoCache + Send + Sync + 'static> {
|
||||
cache: Arc<C>,
|
||||
cancellation_handler: Arc<CancellationHandler<()>>,
|
||||
@@ -471,30 +458,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_unknown_topic() -> anyhow::Result<()> {
|
||||
let with_data = json!({
|
||||
"type": "message",
|
||||
"topic": "/doesnotexist",
|
||||
"data": {
|
||||
"payload": "ignored"
|
||||
},
|
||||
"extra_fields": "something"
|
||||
})
|
||||
.to_string();
|
||||
let result: Notification = serde_json::from_str(&with_data)?;
|
||||
assert_eq!(result, Notification::UnknownTopic);
|
||||
|
||||
let without_data = json!({
|
||||
"type": "message",
|
||||
"topic": "/doesnotexist",
|
||||
"extra_fields": "something"
|
||||
})
|
||||
.to_string();
|
||||
let result: Notification = serde_json::from_str(&without_data)?;
|
||||
assert_eq!(result, Notification::UnknownTopic);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.84.0"
|
||||
channel = "1.83.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -495,7 +495,6 @@ impl Manager {
|
||||
}
|
||||
|
||||
/// Update is_active flag and returns its value.
|
||||
// Timelines marked active are pushed to the broker by the `push_loop` task.
|
||||
fn update_is_active(
|
||||
&mut self,
|
||||
is_wal_backup_required: bool,
|
||||
|
||||
@@ -61,9 +61,7 @@ pub(crate) fn is_wal_backup_required(
|
||||
state: &StateSnapshot,
|
||||
) -> bool {
|
||||
num_computes > 0 ||
|
||||
// This task backups completed segments only.
|
||||
// The current partial segment is backed up by a separate task/code module (wal_backup_partial).
|
||||
// So, need for completed segment backup <=> last backup was at at older segment.
|
||||
// Currently only the whole segment is offloaded, so compare segment numbers.
|
||||
(state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
|
||||
}
|
||||
|
||||
@@ -71,11 +69,6 @@ pub(crate) fn is_wal_backup_required(
|
||||
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
|
||||
/// is running, kill it.
|
||||
pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &StateSnapshot) {
|
||||
// Based on the peer information received from broker, each safekeeper figures out
|
||||
// whether it, or one of the peers, is the offloader.
|
||||
// The algorithm is deterministic, so, if all peers have the same information,
|
||||
// the system converges. In unconverged state, multiple peers upload the same
|
||||
// segments, which is inefficient but safe.
|
||||
let (offloader, election_dbg_str) =
|
||||
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
|
||||
let elected_me = Some(mgr.conf.my_id) == offloader;
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Usage: ./modelcheck.sh <config_file> <spec_file>, e.g.
|
||||
# ./modelcheck.sh models/MCProposerAcceptorStatic_p2_a3_t3_l3.cfg MCProposerAcceptorStatic.tla
|
||||
# ./modelcheck.sh models/MCProposerAcceptorReconfig_p2_a3_t3_l3_c3.cfg MCProposerAcceptorReconfig.tla
|
||||
CONFIG=$1
|
||||
SPEC=$2
|
||||
|
||||
MEM=4G
|
||||
TOOLSPATH=/Applications/TLA+\ Toolbox.app/Contents/Eclipse/tla2tools.jar
|
||||
|
||||
mkdir -p "tlc-results"
|
||||
CONFIG_FILE=$(basename -- "$CONFIG")
|
||||
outfilename="$SPEC-${CONFIG_FILE}-$(date --utc +%Y-%m-%d--%H-%M-%S)".log
|
||||
outfile="tlc-results/$outfilename"
|
||||
echo "saving results to $outfile"
|
||||
touch $outfile
|
||||
|
||||
# Save some info about the run.
|
||||
GIT_REV=`git rev-parse --short HEAD`
|
||||
INFO=`uname -a`
|
||||
|
||||
# First for Linux, second for Mac.
|
||||
CPUNAMELinux=$(lscpu | grep 'Model name' | cut -f 2 -d ":" | awk '{$1=$1}1')
|
||||
CPUCORESLinux=`nproc`
|
||||
CPUNAMEMac=`sysctl -n machdep.cpu.brand_string`
|
||||
CPUCORESMac=`sysctl -n machdep.cpu.thread_count`
|
||||
|
||||
echo "git revision: $GIT_REV" >> $outfile
|
||||
echo "Platform: $INFO" >> $outfile
|
||||
echo "CPU Info Linux: $CPUNAMELinux" >> $outfile
|
||||
echo "CPU Cores Linux: $CPUCORESLinux" >> $outfile
|
||||
echo "CPU Info Mac: $CPUNAMEMac" >> $outfile
|
||||
echo "CPU Cores Mac: $CPUCORESMac" >> $outfile
|
||||
echo "Spec: $SPEC" >> $outfile
|
||||
echo "Config: $CONFIG" >> $outfile
|
||||
echo "----" >> $outfile
|
||||
cat $CONFIG >> $outfile
|
||||
echo "" >> $outfile
|
||||
echo "----" >> $outfile
|
||||
echo "" >> $outfile
|
||||
|
||||
# see
|
||||
# https://lamport.azurewebsites.net/tla/current-tools.pdf
|
||||
# for TLC options.
|
||||
# OffHeapDiskFPSet is the optimal fingerprint set implementation
|
||||
# https://docs.tlapl.us/codebase:architecture#fingerprint_sets_fpsets
|
||||
#
|
||||
# Add -simulate to run in infinite simulation mode.
|
||||
# -coverage 1 is useful for profiling (check how many times actions are taken).
|
||||
java -Xmx$MEM -XX:MaxDirectMemorySize=$MEM -XX:+UseParallelGC -Dtlc2.tool.fp.FPSet.impl=tlc2.tool.fp.OffHeapDiskFPSet \
|
||||
-cp "${TOOLSPATH}" tlc2.TLC $SPEC -config $CONFIG -workers auto -gzip | tee -a $outfile
|
||||
@@ -1,52 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Usage: ./modelcheck.sh <config_file> <spec_file>, e.g.
|
||||
# ./modelcheck.sh models/MCProposerAcceptorStatic_p2_a3_t3_l3.cfg MCProposerAcceptorStatic.tla
|
||||
# ./modelcheck.sh models/MCProposerAcceptorReconfig_p2_a3_t3_l3_c3.cfg MCProposerAcceptorReconfig.tla
|
||||
CONFIG=$1
|
||||
SPEC=$2
|
||||
|
||||
MEM=7G
|
||||
TOOLSPATH="/opt/TLA+Toolbox/tla2tools.jar"
|
||||
|
||||
mkdir -p "tlc-results"
|
||||
CONFIG_FILE=$(basename -- "$CONFIG")
|
||||
outfilename="$SPEC-${CONFIG_FILE}-$(date --utc +%Y-%m-%d--%H-%M-%S)".log
|
||||
outfile="tlc-results/$outfilename"
|
||||
echo "saving results to $outfile"
|
||||
touch $outfile
|
||||
|
||||
# Save some info about the run.
|
||||
GIT_REV=`git rev-parse --short HEAD`
|
||||
INFO=`uname -a`
|
||||
|
||||
# First for Linux, second for Mac.
|
||||
CPUNAMELinux=$(lscpu | grep 'Model name' | cut -f 2 -d ":" | awk '{$1=$1}1')
|
||||
CPUCORESLinux=`nproc`
|
||||
CPUNAMEMac=`sysctl -n machdep.cpu.brand_string`
|
||||
CPUCORESMac=`sysctl -n machdep.cpu.thread_count`
|
||||
|
||||
echo "git revision: $GIT_REV" >> $outfile
|
||||
echo "Platform: $INFO" >> $outfile
|
||||
echo "CPU Info Linux: $CPUNAMELinux" >> $outfile
|
||||
echo "CPU Cores Linux: $CPUCORESLinux" >> $outfile
|
||||
echo "CPU Info Mac: $CPUNAMEMac" >> $outfile
|
||||
echo "CPU Cores Mac: $CPUCORESMac" >> $outfile
|
||||
echo "Spec: $SPEC" >> $outfile
|
||||
echo "Config: $CONFIG" >> $outfile
|
||||
echo "----" >> $outfile
|
||||
cat $CONFIG >> $outfile
|
||||
echo "" >> $outfile
|
||||
echo "----" >> $outfile
|
||||
echo "" >> $outfile
|
||||
|
||||
# see
|
||||
# https://lamport.azurewebsites.net/tla/current-tools.pdf
|
||||
# for TLC options.
|
||||
# OffHeapDiskFPSet is the optimal fingerprint set implementation
|
||||
# https://docs.tlapl.us/codebase:architecture#fingerprint_sets_fpsets
|
||||
#
|
||||
# Add -simulate to run in infinite simulation mode.
|
||||
# -coverage 1 is useful for profiling (check how many times actions are taken).
|
||||
java -Xmx$MEM -XX:MaxDirectMemorySize=$MEM -XX:+UseParallelGC -Dtlc2.tool.fp.FPSet.impl=tlc2.tool.fp.OffHeapDiskFPSet \
|
||||
-cp "${TOOLSPATH}" tlc2.TLC $SPEC -config $CONFIG -workers auto -gzip | tee -a $outfile
|
||||
BIN
storage_broker/spec/replicated/.DS_Store
vendored
BIN
storage_broker/spec/replicated/.DS_Store
vendored
Binary file not shown.
@@ -1,175 +0,0 @@
|
||||
---- MODULE replicated ----
|
||||
|
||||
EXTENDS Integers, FiniteSets
|
||||
|
||||
VARIABLES broker_state, safekeeper_state, pageserver_state, online
|
||||
|
||||
|
||||
CONSTANT
|
||||
brokers,
|
||||
safekeepers,
|
||||
pageservers,
|
||||
azs,
|
||||
az_mapping
|
||||
|
||||
CONSTANT
|
||||
NULL
|
||||
|
||||
CONSTANT
|
||||
max_commit_lsn
|
||||
|
||||
\* HELPERS
|
||||
|
||||
Max(a,b) == IF a > b THEN a ELSE b
|
||||
|
||||
(* The minimum of a non-empty set of integers *)
|
||||
MinOfSet(S) ==
|
||||
CHOOSE x \in S: \A y \in S: x <= y
|
||||
|
||||
(* The minimum of a non-empty set of integers *)
|
||||
MaxOfSet(S) ==
|
||||
CHOOSE x \in S: \A y \in S: x >= y
|
||||
|
||||
\* END HELPERS
|
||||
|
||||
StateConstraint ==
|
||||
/\ \A s \in safekeepers:
|
||||
/\ safekeeper_state[s].commit_lsn <= max_commit_lsn
|
||||
/\ \A b \in brokers:
|
||||
/\ \A s \in DOMAIN broker_state[b].sk:
|
||||
/\ broker_state[b].sk[s].commit_lsn <= max_commit_lsn
|
||||
|
||||
|
||||
|
||||
InitSafekeeper == [prune_lsn |-> 0, commit_lsn |-> 0, rx |-> [b \in brokers |-> NULL] ]
|
||||
InitBroker == [sk |-> [s \in safekeepers |-> [prune_lsn |-> 0, commit_lsn |-> 0]]]
|
||||
InitPageserver == [last_record_lsn |-> 0, preferred_sk |-> NULL, sk |-> [s \in safekeepers |-> [commit_lsn |-> 0]]]
|
||||
InitOnline == safekeepers \cup brokers \cup pageservers
|
||||
|
||||
Init ==
|
||||
/\ broker_state = [b \in brokers |-> InitBroker]
|
||||
/\ safekeeper_state = [s \in safekeepers |-> InitSafekeeper]
|
||||
/\ pageserver_state = [p \in pageservers |-> InitPageserver]
|
||||
/\ online = InitOnline
|
||||
|
||||
NodeOnlineOffline ==
|
||||
/\ online' = CHOOSE ss \in SUBSET InitOnline:
|
||||
/\ Cardinality(ss \cap safekeepers) >= 2
|
||||
/\ Cardinality(ss \cap brokers) >= 2
|
||||
/\ ss \cap pageservers = pageservers \* assume no PS failures for now
|
||||
/\ UNCHANGED <<safekeeper_state,broker_state,pageserver_state>>
|
||||
|
||||
SkCommit(s1, s2) ==
|
||||
/\ {s1, s2} \subseteq online
|
||||
/\ s1 # s2
|
||||
/\ safekeeper_state[s1].commit_lsn = safekeeper_state[s2].commit_lsn
|
||||
/\ LET
|
||||
new_commit_lsn == safekeeper_state[s1].commit_lsn + 1
|
||||
IN
|
||||
safekeeper_state' = [safekeeper_state EXCEPT
|
||||
![s1].commit_lsn = new_commit_lsn,
|
||||
![s2].commit_lsn = new_commit_lsn]
|
||||
/\ UNCHANGED <<broker_state, pageserver_state,online>>
|
||||
|
||||
SkPeerRecovery(s1,s2) ==
|
||||
/\ {s1, s2} \subseteq online
|
||||
/\ safekeeper_state[s1].commit_lsn < safekeeper_state[s2].commit_lsn \* s2 has more WAL than s1
|
||||
/\ safekeeper_state[s2].prune_lsn < safekeeper_state[s1].commit_lsn \* s2 has not yet trimmed the WAL the WAL
|
||||
/\ safekeeper_state' = [safekeeper_state EXCEPT![s1].commit_lsn = safekeeper_state[s2].commit_lsn]
|
||||
/\ UNCHANGED <<broker_state, pageserver_state,online>>
|
||||
|
||||
SkPushToBroker(s,b) ==
|
||||
/\ {s, b} \subseteq online
|
||||
/\ broker_state' = LET
|
||||
broker_side == broker_state[b].sk[s]
|
||||
sk_side == safekeeper_state[s]
|
||||
merged == [broker_side EXCEPT
|
||||
!["commit_lsn"] = Max(broker_side.commit_lsn, sk_side.commit_lsn),
|
||||
!["prune_lsn"] = Max(broker_side.prune_lsn, sk_side.prune_lsn)]
|
||||
IN
|
||||
[broker_state EXCEPT ![b].sk[s] = merged]
|
||||
/\ UNCHANGED <<safekeeper_state, pageserver_state,online>>
|
||||
|
||||
PsRecvBroker(b,p,s) ==
|
||||
/\ {b,p,s} \subseteq online
|
||||
/\ LET
|
||||
bsk == broker_state[b].sk[s]
|
||||
psk == pageserver_state[p].sk[s]
|
||||
updpsk == [psk EXCEPT !["commit_lsn"] = bsk.commit_lsn]
|
||||
IN
|
||||
pageserver_state' = IF bsk.commit_lsn > psk.commit_lsn
|
||||
THEN [pageserver_state EXCEPT ![p].sk[s] = updpsk]
|
||||
ELSE pageserver_state
|
||||
/\ UNCHANGED <<safekeeper_state, broker_state,online>>
|
||||
|
||||
|
||||
SkRecvBroker(b,s) ==
|
||||
/\ {b,s} \subseteq online
|
||||
/\ safekeeper_state' = [safekeeper_state EXCEPT ![s].rx[b] = broker_state[b]]
|
||||
/\ UNCHANGED <<pageserver_state,broker_state,online>>
|
||||
|
||||
SkPrune(s) ==
|
||||
/\ {s} \subseteq online
|
||||
/\ LET
|
||||
sk == safekeeper_state[s]
|
||||
skis == {<<b,DOMAIN bi.sk>>: <<b,bi>> \in {<<b,bi>> \in { <<b,sk.rx[b]>>: b \in sk.rx }: bi # NULL} }
|
||||
|
||||
IN
|
||||
safekeeper_state' = safekeeper_state
|
||||
/\ UNCHANGED <<pageserver_state,broker_state,online>>
|
||||
|
||||
|
||||
SksWithNewerWal(p) ==
|
||||
LET
|
||||
ps == pageserver_state[p]
|
||||
IN
|
||||
{s \in DOMAIN ps.sk: ps.sk[s].commit_lsn > ps.last_record_lsn}
|
||||
|
||||
PsChooseSk(p) ==
|
||||
/\ {p} \subseteq online
|
||||
/\ SksWithNewerWal(p) # {}
|
||||
/\ pageserver_state' = [pageserver_state EXCEPT![p].preferred_sk = CHOOSE s \in SksWithNewerWal(p): TRUE]
|
||||
/\ UNCHANGED <<safekeeper_state, broker_state,online>>
|
||||
|
||||
|
||||
Next ==
|
||||
\/ NodeOnlineOffline
|
||||
\/ \E s1 \in safekeepers: \E s2 \in safekeepers:
|
||||
\/ SkCommit(s1, s2)
|
||||
\/ SkPeerRecovery(s1, s2)
|
||||
\/ \E s \in safekeepers: \E b \in brokers:
|
||||
\/ SkPushToBroker(s, b)
|
||||
\/ SkRecvBroker(b, s)
|
||||
\/ \E s \in safekeepers:
|
||||
\/ SkPrune(s)
|
||||
\/ \E s \in safekeepers: \E b \in brokers: \E p \in pageservers: PsRecvBroker(b,p,s)
|
||||
\/ \E p \in pageservers: PsChooseSk(p)
|
||||
|
||||
|
||||
Spec == Init /\ [][Next]_<< broker_state, safekeeper_state, pageserver_state,online>>
|
||||
|
||||
|
||||
\* invariants
|
||||
|
||||
PsLagsSk == \A p \in pageservers: \A s \in DOMAIN pageserver_state[p].sk:
|
||||
/\ pageserver_state[p].sk[s].commit_lsn <= safekeeper_state[s].commit_lsn
|
||||
|
||||
PeerRecoveryIsPossible ==
|
||||
\A laggard \in (safekeepers \cap online): \E donor \in (safekeepers \cap online):
|
||||
/\ (safekeeper_state[laggard].commit_lsn < safekeeper_state[donor].commit_lsn)
|
||||
=> safekeeper_state[donor].prune_lsn <= safekeeper_state[laggard].commit_lsn
|
||||
|
||||
|
||||
EventuallyLaggingSkIsNotPreferredSk == <>(
|
||||
LET
|
||||
sks == safekeeper_state
|
||||
lagging_sks == { s \in safekeepers: \A s2 \in safekeepers: sks[s].commit_lsn <= sks[s2].commit_lsn }
|
||||
preferred_sks == {pageserver_state[p].preferred_sk: p \in pageservers}
|
||||
IN
|
||||
preferred_sks \cap lagging_sks = {}
|
||||
|
||||
)
|
||||
|
||||
|
||||
|
||||
====
|
||||
@@ -1,24 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<projectDescription>
|
||||
<name>replicated</name>
|
||||
<comment></comment>
|
||||
<projects>
|
||||
</projects>
|
||||
<buildSpec>
|
||||
<buildCommand>
|
||||
<name>toolbox.builder.TLAParserBuilder</name>
|
||||
<arguments>
|
||||
</arguments>
|
||||
</buildCommand>
|
||||
</buildSpec>
|
||||
<natures>
|
||||
<nature>toolbox.natures.TLANature</nature>
|
||||
</natures>
|
||||
<linkedResources>
|
||||
<link>
|
||||
<name>replicated.tla</name>
|
||||
<type>1</type>
|
||||
<locationURI>PARENT-1-PROJECT_LOC/replicated.tla</locationURI>
|
||||
</link>
|
||||
</linkedResources>
|
||||
</projectDescription>
|
||||
@@ -1,2 +0,0 @@
|
||||
ProjectRootFile=PARENT-1-PROJECT_LOC/replicated.tla
|
||||
eclipse.preferences.version=1
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user