mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
Compare commits
14 Commits
conrad/dyn
...
jcsp/stabi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9fcb176e8 | ||
|
|
5e507776bc | ||
|
|
4e8e0951be | ||
|
|
64a8d0c2e6 | ||
|
|
7602e6ffc0 | ||
|
|
17193d6a33 | ||
|
|
03ae57236f | ||
|
|
e3d27b2f68 | ||
|
|
dd1299f337 | ||
|
|
cb19e4e05d | ||
|
|
9df230c837 | ||
|
|
3c2bc5baba | ||
|
|
6667810800 | ||
|
|
47f5bcf2bc |
31
.github/scripts/push_with_image_map.py
vendored
31
.github/scripts/push_with_image_map.py
vendored
@@ -11,12 +11,27 @@ try:
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError("Failed to parse IMAGE_MAP as JSON") from e
|
||||
|
||||
for source, targets in parsed_image_map.items():
|
||||
for target in targets:
|
||||
cmd = ["docker", "buildx", "imagetools", "create", "-t", target, source]
|
||||
print(f"Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
failures = []
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"Error: {result.stdout}")
|
||||
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
|
||||
pending = [(source, target) for source, targets in parsed_image_map.items() for target in targets]
|
||||
|
||||
while len(pending) > 0:
|
||||
if len(failures) > 10:
|
||||
print("Error: more than 10 failures!")
|
||||
for failure in failures:
|
||||
print(f'"{failure[0]}" failed with the following output:')
|
||||
print(failure[1])
|
||||
raise RuntimeError("Retry limit reached.")
|
||||
|
||||
source, target = pending.pop(0)
|
||||
cmd = ["docker", "buildx", "imagetools", "create", "-t", target, source]
|
||||
print(f"Running: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
if result.returncode != 0:
|
||||
failures.append((" ".join(cmd), result.stdout))
|
||||
pending.append((source, target))
|
||||
|
||||
if len(failures) > 0 and (github_output := os.getenv("GITHUB_OUTPUT")):
|
||||
with open(github_output, "a") as f:
|
||||
f.write("slack_notify=true\n")
|
||||
|
||||
@@ -104,6 +104,18 @@ jobs:
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
- name: Copy docker images to target registries
|
||||
id: push
|
||||
run: python3 .github/scripts/push_with_image_map.py
|
||||
env:
|
||||
IMAGE_MAP: ${{ inputs.image-map }}
|
||||
|
||||
- name: Notify Slack if container image pushing fails
|
||||
if: steps.push.outputs.slack_notify == 'true' || failure()
|
||||
uses: slackapi/slack-github-action@485a9d42d3a73031f12ec201c457e2162c45d02d # v2.0.0
|
||||
with:
|
||||
method: chat.postMessage
|
||||
token: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
payload: |
|
||||
channel: ${{ vars.SLACK_ON_CALL_DEVPROD_STREAM }}
|
||||
text: |
|
||||
Pushing container images failed in <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
|
||||
25
.github/workflows/build_and_test.yml
vendored
25
.github/workflows/build_and_test.yml
vendored
@@ -89,8 +89,8 @@ jobs:
|
||||
|
||||
check-codestyle-python:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
# No need to run on `main` because we this in the merge queue
|
||||
if: ${{ needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/_check-codestyle-python.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -98,7 +98,8 @@ jobs:
|
||||
|
||||
check-codestyle-jsonnet:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
if: ${{ contains(fromJSON('["pr", "push-main"]'), needs.meta.outputs.run-kind) }}
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
@@ -181,8 +182,8 @@ jobs:
|
||||
|
||||
check-codestyle-rust:
|
||||
needs: [ meta, check-permissions, build-build-tools-image ]
|
||||
# No need to run on `main` because we this in the merge queue
|
||||
if: ${{ needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/_check-codestyle-rust.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -191,7 +192,8 @@ jobs:
|
||||
|
||||
check-dependencies-rust:
|
||||
needs: [ meta, files-changed, build-build-tools-image ]
|
||||
if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' && needs.meta.outputs.run-kind == 'pr' }}
|
||||
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ needs.files-changed.outputs.check-rust-dependencies == 'true' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
uses: ./.github/workflows/cargo-deny.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
@@ -199,7 +201,8 @@ jobs:
|
||||
|
||||
build-and-test-locally:
|
||||
needs: [ meta, build-build-tools-image ]
|
||||
if: ${{ contains(fromJSON('["pr", "push-main"]'), needs.meta.outputs.run-kind) }}
|
||||
# We do need to run this in `.*-rc-pr` because of hotfixes.
|
||||
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -1565,10 +1568,10 @@ jobs:
|
||||
if: |
|
||||
contains(needs.*.result, 'failure')
|
||||
|| contains(needs.*.result, 'cancelled')
|
||||
|| (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.build-and-test-locally.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-codestyle-python.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-codestyle-rust.result == 'skipped' && needs.meta.outputs.run-kind == 'pr')
|
||||
|| (needs.check-dependencies-rust.result == 'skipped' && needs.files-changed.outputs.check-rust-dependencies == 'true' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.build-and-test-locally.result == 'skipped' && contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.check-codestyle-python.result == 'skipped' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.check-codestyle-rust.result == 'skipped' && contains(fromJSON('["pr", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| needs.files-changed.result == 'skipped'
|
||||
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|
||||
|
||||
@@ -369,7 +369,7 @@ FROM build-deps AS plv8-src
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src
|
||||
|
||||
COPY compute/patches/plv8-3.1.10.patch .
|
||||
COPY compute/patches/plv8* .
|
||||
|
||||
# plv8 3.2.3 supports v17
|
||||
# last release v3.2.3 - Sep 7, 2024
|
||||
@@ -393,7 +393,7 @@ RUN case "${PG_VERSION:?}" in \
|
||||
git clone --recurse-submodules --depth 1 --branch ${PLV8_TAG} https://github.com/plv8/plv8.git plv8-src && \
|
||||
tar -czf plv8.tar.gz --exclude .git plv8-src && \
|
||||
cd plv8-src && \
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8_v3.1.10.patch; else patch -p1 < /ext-src/plv8_v3.2.3.patch; fi
|
||||
|
||||
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
|
||||
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
commit 46b38d3e46f9cd6c70d9b189dd6ff4abaa17cf5e
|
||||
Author: Alexander Bayandin <alexander@neon.tech>
|
||||
Date: Sat Nov 30 18:29:32 2024 +0000
|
||||
|
||||
Fix v8 9.7.37 compilation on Debian 12
|
||||
|
||||
diff --git a/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
|
||||
new file mode 100644
|
||||
index 0000000..f0a5dc7
|
||||
index 0000000..fae1cb3
|
||||
--- /dev/null
|
||||
+++ b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
|
||||
@@ -0,0 +1,30 @@
|
||||
@@ -35,8 +29,21 @@ index 0000000..f0a5dc7
|
||||
+@@ -5,6 +5,7 @@
|
||||
+ #ifndef V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
|
||||
+ #define V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
|
||||
+
|
||||
+
|
||||
++#include <utility>
|
||||
+ #include <vector>
|
||||
+
|
||||
+
|
||||
+ #include "include/cppgc/prefinalizer.h"
|
||||
diff --git a/plv8.cc b/plv8.cc
|
||||
index c1ce883..6e47e94 100644
|
||||
--- a/plv8.cc
|
||||
+++ b/plv8.cc
|
||||
@@ -379,7 +379,7 @@ _PG_init(void)
|
||||
NULL,
|
||||
&plv8_v8_flags,
|
||||
NULL,
|
||||
- PGC_USERSET, 0,
|
||||
+ PGC_SUSET, 0,
|
||||
#if PG_VERSION_NUM >= 90100
|
||||
NULL,
|
||||
#endif
|
||||
13
compute/patches/plv8_v3.2.3.patch
Normal file
13
compute/patches/plv8_v3.2.3.patch
Normal file
@@ -0,0 +1,13 @@
|
||||
diff --git a/plv8.cc b/plv8.cc
|
||||
index edfa2aa..623e7f2 100644
|
||||
--- a/plv8.cc
|
||||
+++ b/plv8.cc
|
||||
@@ -385,7 +385,7 @@ _PG_init(void)
|
||||
NULL,
|
||||
&plv8_v8_flags,
|
||||
NULL,
|
||||
- PGC_USERSET, 0,
|
||||
+ PGC_SUSET, 0,
|
||||
#if PG_VERSION_NUM >= 90100
|
||||
NULL,
|
||||
#endif
|
||||
@@ -188,7 +188,7 @@ impl ComputeState {
|
||||
|
||||
COMPUTE_CTL_UP.reset();
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, format!("{}", status).as_str()])
|
||||
.with_label_values(&[&BUILD_TAG, status.to_string().as_str()])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
@@ -360,6 +360,14 @@ impl ComputeNode {
|
||||
this.prewarm_postgres()?;
|
||||
}
|
||||
|
||||
// Set the up metric with Empty status before starting the HTTP server.
|
||||
// That way on the first metric scrape, an external observer will see us
|
||||
// as 'up' and 'empty' (unless the compute was started with a spec or
|
||||
// already configured by control plane).
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
|
||||
.set(1);
|
||||
|
||||
// Launch the external HTTP server first, so that we can serve control plane
|
||||
// requests while configuration is still in progress.
|
||||
crate::http::server::Server::External {
|
||||
@@ -369,19 +377,13 @@ impl ComputeNode {
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// The internal HTTP server is needed for a further activation by control plane
|
||||
// if compute was started for a pool, so we have to start server before hanging
|
||||
// waiting for a spec.
|
||||
// The internal HTTP server could be launched later, but there isn't much
|
||||
// sense in waiting.
|
||||
crate::http::server::Server::Internal {
|
||||
port: this.params.internal_http_port,
|
||||
}
|
||||
.launch(&this);
|
||||
|
||||
// HTTP server is running, so we can officially declare compute_ctl as 'up'
|
||||
COMPUTE_CTL_UP
|
||||
.with_label_values(&[&BUILD_TAG, ComputeStatus::Empty.to_string().as_str()])
|
||||
.set(1);
|
||||
|
||||
// If we got a spec from the CLI already, use that. Otherwise wait for the
|
||||
// control plane to pass it to us with a /configure HTTP request
|
||||
let pspec = if let Some(cli_spec) = cli_spec {
|
||||
|
||||
@@ -59,9 +59,12 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
Box::pin(async move {
|
||||
let request_id = request.extract_parts::<RequestId>().await.unwrap();
|
||||
|
||||
// TODO: Remove this check after a successful rollout
|
||||
if jwks.keys.is_empty() {
|
||||
warn!(%request_id, "Authorization has not been configured");
|
||||
// TODO: Remove this stanza after teaching neon_local and the
|
||||
// regression tests to use a JWT + JWKS.
|
||||
//
|
||||
// https://github.com/neondatabase/neon/issues/11316
|
||||
if cfg!(feature = "testing") {
|
||||
warn!(%request_id, "Skipping compute_ctl authorization check");
|
||||
|
||||
return Ok(request);
|
||||
}
|
||||
@@ -110,8 +113,6 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
impl Authorize {
|
||||
/// Verify the token using the JSON Web Key set and return the token data.
|
||||
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
|
||||
debug_assert!(!jwks.keys.is_empty());
|
||||
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
|
||||
@@ -21,6 +21,7 @@ in this repository.
|
||||
- [WAL Redo](./pageserver-walredo.md)
|
||||
- [Page cache](./pageserver-pagecache.md)
|
||||
- [Storage](./pageserver-storage.md)
|
||||
- [Compaction](./pageserver-compaction.md)
|
||||
- [Processing a GetPage request](./pageserver-processing-getpage.md)
|
||||
- [Processing WAL](./pageserver-processing-wal.md)
|
||||
|
||||
|
||||
110
docs/pageserver-compaction.md
Normal file
110
docs/pageserver-compaction.md
Normal file
@@ -0,0 +1,110 @@
|
||||
# Pageserver Compaction
|
||||
|
||||
Lifted from <https://www.notion.so/neondatabase/Rough-Notes-on-Compaction-1baf189e004780859e65ef63b85cfa81?pvs=4>.
|
||||
|
||||
Updated 2025-03-26.
|
||||
|
||||
## Pages and WAL
|
||||
|
||||
Postgres stores data in 8 KB pages, identified by a page number.
|
||||
|
||||
The WAL contains a sequence of page writes: either images (complete page contents) or deltas (patches applied to images). Each write is identified by its byte position in the WAL, aka LSN.
|
||||
|
||||
Each page version is thus identified by page@LSN. Postgres may read pages at past LSNs.
|
||||
|
||||
Pageservers ingest WAL by writing WAL records into a key/value store keyed by page@LSN.
|
||||
|
||||
Pageservers materialize pages for Postgres reads by finding the most recent page image and applying all subsequent page deltas, up to the read LSN.
|
||||
|
||||
## Compaction: Why?
|
||||
|
||||
Pageservers store page@LSN keys in a key/value store using a custom variant of an LSM tree. Each timeline on each tenant shard has its own LSM tree.
|
||||
|
||||
When Pageservers write new page@LSN entries, they are appended unordered to an ephemeral layer file. When the ephemeral layer file exceeds `checkpoint_distance` (default 256 MB), the key/value pairs are sorted by key and written out to a layer file (for efficient lookups).
|
||||
|
||||
As WAL writes continue, more layer files accumulate.
|
||||
|
||||
Reads must search through the layer files to find the page’s image and deltas. The more layer files accumulate, the more la yer files reads must search through before they find a page image, aka read amplification.
|
||||
|
||||
Compaction’s job is to:
|
||||
|
||||
- Reduce read amplification by reorganizing and combining layer files.
|
||||
- Remove old garbage from layer files.
|
||||
|
||||
As part of this, it may combine several page deltas into a single page image where possible.
|
||||
|
||||
## Compaction: How?
|
||||
|
||||
Neon uses a non-standard variant of an LSM tree made up of two levels of layer files: L0 and L1.
|
||||
|
||||
Compaction runs in two phases: L0→L1 compaction, and L1 image compaction.
|
||||
|
||||
L0 contains a stack of L0 layers at decreasing LSN ranges. These have been flushed sequentially from ephemeral layers. Each L0 layer covers the entire page space (page 0 to ~infinity) and the LSN range that was ingested into it. L0 layers are therefore particularly bad for read amp, since every read must search all L0 layers below the read LSN. For example:
|
||||
|
||||
```
|
||||
| Page 0-99 @ LSN 0400-04ff |
|
||||
| Page 0-99 @ LSN 0300-03ff |
|
||||
| Page 0-99 @ LSN 0200-02ff |
|
||||
| Page 0-99 @ LSN 0100-01ff |
|
||||
| Page 0-99 @ LSN 0000-00ff |
|
||||
```
|
||||
|
||||
L0→L1 compaction takes the bottom-most chunk of L0 layer files of between `compaction_threshold` (default 10) and `compaction_upper_limit` (default 20) layers. It uses merge-sort to write out sorted L1 delta layers of size `compaction_target_size` (default 128 MB).
|
||||
|
||||
L1 typically consists of a “bed” of image layers with materialized page images at a specific LSN, and then delta layers of various page/LSN ranges above them with page deltas. For example:
|
||||
|
||||
```
|
||||
Delta layers: | 30-84@0310-04ff |
|
||||
Delta layers: | 10-42@0200-02ff | | 65-92@0174-02aa |
|
||||
Image layers: | 0-39@0100 | 40-79@0100 | 80-99@0100 |
|
||||
```
|
||||
|
||||
L1 image compaction scans across the L1 keyspace at some LSN, materializes page images by reading the image and delta layers below the LSN (via vectored reads), and writes out new sorted image layers of roughly size `compaction_target_size` (default 128 MB) at that LSN.
|
||||
|
||||
Layer files below the new image files’ LSN can be garbage collected when they are no longer needed for PITR.
|
||||
|
||||
Even though the old layer files are not immediately garbage collected, the new image layers help with read amp because reads can stop traversing the layer stack as soon as they encounter a page image.
|
||||
|
||||
## Compaction: When?
|
||||
|
||||
Pageservers run a `compaction_loop` background task for each tenant shard. Every `compaction_period` (default 20 seconds) it will wake up and check if any of the shard’s timelines need compaction. Additionally, L0 layer flushes will eagerly wake the compaction loop if the L0 count exceeds `compaction_threshold` (default 10).
|
||||
|
||||
L0 compaction runs if the number of L0 layers exceeds `compaction_threshold` (default 10).
|
||||
|
||||
L1 image compaction runs across sections of the L1 keyspace that have at least `image_creation_threshold` (default 3) delta layers overlapping image layers.
|
||||
|
||||
At most `CONCURRENT_BACKGROUND_TASKS` (default 3 / 4 * CPUs = 6) background tasks can run concurrently on a Pageserver, including compaction. Further compaction tasks must wait.
|
||||
|
||||
Because L0 layers cause the most read amp (they overlap the entire keyspace and only contain page deltas), they are aggressively compacted down:
|
||||
|
||||
- L0 is compacted down across all tenant timelines before L1 compaction is attempted (`compaction_l0_first`).
|
||||
- L0 compaction uses a separate concurrency limit of `CONCURRENT_L0_COMPACTION_TASKS` (default 3 / 4 * CPUs = 6) to avoid waiting for other tasks (`compaction_l0_semaphore`).
|
||||
- If L0 compaction is needed on any tenant timeline, L1 image compaction will yield to start an immediate L0 compaction run (except for compaction run via admin APIs).
|
||||
|
||||
## Backpressure
|
||||
|
||||
With sustained heavy write loads, new L0 layers may be flushed faster than they can be compacted down. This can cause an unbounded buildup of read amplification and compaction debt, which can take hours to resolve even after the writes stop.
|
||||
|
||||
To avoid this and allow compaction to keep up, layer flushes will slow writes down to apply backpressure on the workload:
|
||||
|
||||
- At `l0_flush_delay_threshold` (default 30) L0 layers, layer flushes are delayed by the flush duration, such that they take 2x as long.
|
||||
- At `l0_flush_stall_threshold` (default disabled) L0 layers, layer flushes stall entirely until the L0 count falls back below the threshold. This is currently disabled because we don’t trust L0 compaction to be responsive enough.
|
||||
|
||||
This backpressure is propagated to the compute by waiting for layer flushes when WAL ingestion rolls the ephemeral layer. The compute will significantly slow down WAL writes at:
|
||||
|
||||
- `max_replication_write_lag` (default 500 MB), when Pageserver WAL ingestion lags
|
||||
- `max_replication_flush_lag` (default 10 GB), when Pageserver L0 flushes lag
|
||||
|
||||
Combined, this means that the compute will backpressure when there are 30 L0 layers (30 * 256 MB = 7.7 GB) and the Pageserver WAL ingestion lags the compute by 500 MB, for a total of ~8 GB L0+ephemeral compaction debt on a single shard.
|
||||
|
||||
Since we only delay L0 flushes by 2x when backpressuring, and haven’t enabled stalls, it is still possible for read amp to increase unbounded if compaction is too slow (although we haven’t seen this in practice). But this is considered better than stalling flushes and causing unavailability for as long as it takes L0 compaction to react, since we don’t trust it to be fast enough — at the expense of continually increasing read latency and CPU usage for this tenant. We should either enable stalls when we have enough confidence in L0 compaction, or scale the flush delay by the number of L0 layers to apply increasing backpressure.
|
||||
|
||||
## Circuit Breaker
|
||||
|
||||
Compaction can fail, often repeatedly. This can happen e.g. due to data corruption, faulty hardware, S3 outages, etc.
|
||||
|
||||
If compaction fails, the compaction loop will naïvely try and fail again almost immediately. It may only fail after doing a significant amount of wasted work, while holding onto the background task semaphore.
|
||||
|
||||
To avoid repeatedly doing wasted work and starving out other compaction jobs, each tenant has a compaction circuit breaker. After 5 repeated compaction failures, the circuit breaker trips and disables compaction for the next 24 hours, before resetting the breaker and trying again. This disables compaction across all tenant timelines (faulty or not).
|
||||
|
||||
Disabling compaction for a long time is dangerous, since it can lead to unbounded read amp and compaction debt, and continuous workload backpressure. However, continually failing would not help either. Tripped circuit breakers trigger an alert and must be investigated promptly.
|
||||
@@ -1418,11 +1418,6 @@ pub struct TimelineInfo {
|
||||
pub last_record_lsn: Lsn,
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
|
||||
/// Legacy field, retained for one version to enable old storage controller to
|
||||
/// decode (it was a mandatory field).
|
||||
#[serde(default, rename = "latest_gc_cutoff_lsn")]
|
||||
pub _unused: Lsn,
|
||||
|
||||
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
|
||||
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
|
||||
/// as it is easier to reason about.
|
||||
|
||||
@@ -86,17 +86,17 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// Get an arbitrary path and returning a streaming Response. This function is suitable
|
||||
/// for pass-through/proxy use cases where we don't care what the response content looks
|
||||
/// like.
|
||||
/// Send an HTTP request to an arbitrary path with a desired HTTP method and returning a streaming
|
||||
/// Response. This function is suitable for pass-through/proxy use cases where we don't care
|
||||
/// what the response content looks like.
|
||||
///
|
||||
/// Use/add one of the properly typed methods below if you know aren't proxying, and
|
||||
/// know what kind of response you expect.
|
||||
pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
|
||||
pub async fn op_raw(&self, method: Method, path: String) -> Result<reqwest::Response> {
|
||||
debug_assert!(path.starts_with('/'));
|
||||
let uri = format!("{}{}", self.mgmt_api_endpoint, path);
|
||||
|
||||
let mut req = self.client.request(Method::GET, uri);
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value);
|
||||
}
|
||||
|
||||
@@ -74,8 +74,8 @@ use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
|
||||
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
|
||||
use crate::tenant::timeline::{
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, Timeline, WaitLsnTimeout,
|
||||
WaitLsnWaiter, import_pgdata,
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
|
||||
WaitLsnTimeout, WaitLsnWaiter, import_pgdata,
|
||||
};
|
||||
use crate::tenant::{
|
||||
GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError,
|
||||
@@ -445,6 +445,9 @@ async fn build_timeline_info_common(
|
||||
|
||||
let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
|
||||
|
||||
// Externally, expose the lowest LSN that can be used to create a branch.
|
||||
// Internally we distinguish between the planned GC cutoff (PITR point) and the "applied" GC cutoff (where we
|
||||
// actually trimmed data to), which can pass each other when PITR is changed.
|
||||
let min_readable_lsn = std::cmp::max(
|
||||
timeline.get_gc_cutoff_lsn(),
|
||||
*timeline.get_applied_gc_cutoff_lsn(),
|
||||
@@ -461,7 +464,6 @@ async fn build_timeline_info_common(
|
||||
initdb_lsn,
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
_unused: Default::default(), // Unused, for legacy decode only
|
||||
min_readable_lsn,
|
||||
applied_gc_cutoff_lsn: *timeline.get_applied_gc_cutoff_lsn(),
|
||||
current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
|
||||
@@ -2335,21 +2337,31 @@ async fn timeline_compact_handler(
|
||||
}
|
||||
|
||||
async fn timeline_mark_invisible_handler(
|
||||
request: Request<Body>,
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let compact_request = json_request_maybe::<Option<MarkInvisibleRequest>>(&mut request).await?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let visibility = match compact_request {
|
||||
Some(req) => match req.is_visible {
|
||||
Some(true) => TimelineVisibilityState::Visible,
|
||||
Some(false) | None => TimelineVisibilityState::Invisible,
|
||||
},
|
||||
None => TimelineVisibilityState::Invisible,
|
||||
};
|
||||
|
||||
async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(TimelineVisibilityState::Invisible).map_err(ApiError::InternalServerError)?;
|
||||
timeline.remote_client.schedule_index_upload_for_timeline_invisible_state(visibility).map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_timeline_mark_invisible", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
|
||||
@@ -3248,17 +3248,23 @@ impl Tenant {
|
||||
async fn housekeeping(&self) {
|
||||
// Call through to all timelines to freeze ephemeral layers as needed. This usually happens
|
||||
// during ingest, but we don't want idle timelines to hold open layers for too long.
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tli| tli.is_active())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
//
|
||||
// We don't do this if the tenant can't upload layers (i.e. it's in stale attachment mode).
|
||||
// We don't run compaction in this case either, and don't want to keep flushing tiny L0
|
||||
// layers that won't be compacted down.
|
||||
if self.tenant_conf.load().location.may_upload_layers_hint() {
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tli| tli.is_active())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
|
||||
for timeline in timelines {
|
||||
timeline.maybe_freeze_ephemeral_layer().await;
|
||||
for timeline in timelines {
|
||||
timeline.maybe_freeze_ephemeral_layer().await;
|
||||
}
|
||||
}
|
||||
|
||||
// Shut down walredo if idle.
|
||||
|
||||
@@ -895,6 +895,12 @@ pub(crate) struct CompactRequest {
|
||||
pub sub_compaction_max_job_size_mb: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct MarkInvisibleRequest {
|
||||
#[serde(default)]
|
||||
pub is_visible: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct CompactOptions {
|
||||
pub flags: EnumSet<CompactFlags>,
|
||||
|
||||
@@ -639,6 +639,15 @@ async fn handle_tenant_timeline_passthrough(
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
|
||||
};
|
||||
|
||||
let method = match *req.method() {
|
||||
hyper::Method::GET => reqwest::Method::GET,
|
||||
hyper::Method::POST => reqwest::Method::POST,
|
||||
hyper::Method::PUT => reqwest::Method::PUT,
|
||||
hyper::Method::DELETE => reqwest::Method::DELETE,
|
||||
hyper::Method::PATCH => reqwest::Method::PATCH,
|
||||
_ => return Err(ApiError::BadRequest(anyhow::anyhow!("Unsupported method"))),
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Proxying request for tenant {} ({})",
|
||||
tenant_or_shard_id.tenant_id,
|
||||
@@ -686,7 +695,7 @@ async fn handle_tenant_timeline_passthrough(
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
let resp = client.op_raw(method, path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
|
||||
@@ -1407,6 +1416,12 @@ async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Bod
|
||||
)));
|
||||
}
|
||||
|
||||
if id <= 0 {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"id not allowed to be zero or negative: {id}"
|
||||
)));
|
||||
}
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
@@ -2247,6 +2262,17 @@ pub fn make_router(
|
||||
RequestName("v1_tenant_passthrough"),
|
||||
)
|
||||
})
|
||||
// Tenant timeline mark_invisible passthrough to shard zero
|
||||
.put(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_passthrough,
|
||||
RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
|
||||
)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -115,19 +115,17 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
split_threshold: Option<u64>,
|
||||
|
||||
/// Maximum number of shards during autosplits. 0 disables autosplits.
|
||||
// TODO: defaults to 8 for backwards compatibility, should default to 255.
|
||||
#[arg(long, default_value = "8")]
|
||||
/// Maximum number of shards during autosplits. 0 disables autosplits. Defaults
|
||||
/// to 16 as a safety to avoid too many shards by accident.
|
||||
#[arg(long, default_value = "16")]
|
||||
max_split_shards: u8,
|
||||
|
||||
/// Size threshold for initial shard splits of unsharded tenants. 0 disables initial splits.
|
||||
// TODO: defaults to 64 GB for backwards compatibility. Should default to None.
|
||||
#[arg(long, default_value = "68719476736")]
|
||||
initial_split_threshold: u64,
|
||||
#[arg(long)]
|
||||
initial_split_threshold: Option<u64>,
|
||||
|
||||
/// Number of target shards for initial splits. 0 or 1 disables initial splits.
|
||||
// TODO: defaults to 8 for backwards compatibility. Should default to 2.
|
||||
#[arg(long, default_value = "8")]
|
||||
/// Number of target shards for initial splits. 0 or 1 disables initial splits. Defaults to 2.
|
||||
#[arg(long, default_value = "2")]
|
||||
initial_split_shards: u8,
|
||||
|
||||
/// Maximum number of normal-priority reconcilers that may run in parallel
|
||||
@@ -417,7 +415,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
tenant_rate_limit: args.tenant_rate_limit,
|
||||
split_threshold: args.split_threshold,
|
||||
max_split_shards: args.max_split_shards,
|
||||
initial_split_threshold: Some(args.initial_split_threshold),
|
||||
initial_split_threshold: args.initial_split_threshold,
|
||||
initial_split_shards: args.initial_split_shards,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
|
||||
|
||||
@@ -1344,6 +1344,8 @@ class NeonEnv:
|
||||
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
|
||||
):
|
||||
for sk_id, sk in enumerate(self.safekeepers):
|
||||
# 0 is an invalid safekeeper id
|
||||
sk_id = sk_id + 1
|
||||
body = {
|
||||
"id": sk_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
|
||||
@@ -118,6 +118,7 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
# failing to connect to them.
|
||||
".*Call to node.*management API.*failed.*receive body.*",
|
||||
".*Call to node.*management API.*failed.*ReceiveBody.*",
|
||||
".*Call to node.*management API still failed after .+ retries, giving up.*",
|
||||
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
|
||||
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
|
||||
# Many tests will start up with a node offline
|
||||
|
||||
@@ -853,6 +853,25 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res_json = res.json()
|
||||
return res_json
|
||||
|
||||
def timeline_mark_invisible(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
is_visible: bool | None = None,
|
||||
):
|
||||
data = {
|
||||
"is_visible": is_visible,
|
||||
}
|
||||
|
||||
log.info(
|
||||
f"Requesting marking timeline invisible for {is_visible=}, {tenant_id=}, {timeline_id=}"
|
||||
)
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/mark_invisible",
|
||||
json=data,
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_get_timestamp_of_lsn(
|
||||
self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, lsn: Lsn
|
||||
):
|
||||
|
||||
@@ -43,7 +43,7 @@ def single_timeline(
|
||||
f"template tenant is template_tenant={template_tenant} template_timeline={template_timeline}"
|
||||
)
|
||||
|
||||
log.info("detach template tenant form pageserver")
|
||||
log.info("detach template tenant from pageserver")
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
|
||||
log.info(f"duplicating template tenant {ncopies} times in remote storage")
|
||||
@@ -65,11 +65,13 @@ def single_timeline(
|
||||
assert ps_http.tenant_list() == []
|
||||
|
||||
def attach(tenant):
|
||||
env.pageserver.tenant_attach(
|
||||
tenant,
|
||||
config=template_config.copy(),
|
||||
generation=100,
|
||||
override_storage_controller_generation=True,
|
||||
# NB: create the new tenant in the storage controller with the correct tenant config. This
|
||||
# will pick up the existing tenant data from remote storage. If we just attach it to the
|
||||
# Pageserver, the storage controller will reset the tenant config to the default.
|
||||
env.create_tenant(
|
||||
tenant_id=tenant,
|
||||
timeline_id=template_timeline,
|
||||
conf=template_config,
|
||||
)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=22) as executor:
|
||||
|
||||
@@ -15,9 +15,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.utils import get_scale_for_db, humantime_to_ms, skip_on_ci
|
||||
|
||||
from performance.pageserver.util import (
|
||||
setup_pageserver_with_tenants,
|
||||
)
|
||||
from performance.pageserver.util import setup_pageserver_with_tenants
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
@@ -126,14 +124,11 @@ def setup_and_run_pagebench_benchmark(
|
||||
for param, (value, kwargs) in params.items():
|
||||
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
|
||||
|
||||
def setup_wrapper(env: NeonEnv):
|
||||
return setup_tenant_template(env, pg_bin, pgbench_scale)
|
||||
|
||||
env = setup_pageserver_with_tenants(
|
||||
neon_env_builder,
|
||||
f"max_throughput_latest_lsn-{n_tenants}-{pgbench_scale}",
|
||||
n_tenants,
|
||||
setup_wrapper,
|
||||
lambda env: setup_tenant_template(env, pg_bin, pgbench_scale),
|
||||
# https://github.com/neondatabase/neon/issues/8070
|
||||
timeout_in_seconds=60,
|
||||
)
|
||||
@@ -160,14 +155,8 @@ def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
|
||||
"gc_period": "0s", # disable periodic gc
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "0s", # disable periodic compaction
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
template_tenant, template_timeline = env.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
template_tenant, template_timeline = env.create_tenant(set_default=True, conf=config)
|
||||
ps_http = env.pageserver.http_client()
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
|
||||
|
||||
@@ -166,6 +166,7 @@ def test_pageserver_compaction_preempt(
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM
|
||||
@pytest.mark.parametrize(
|
||||
"with_branches",
|
||||
["with_branches", "no_branches"],
|
||||
|
||||
@@ -782,6 +782,21 @@ def test_lsn_lease_storcon(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
|
||||
def test_mark_invisible_storcon(neon_env_builder: NeonEnvBuilder):
|
||||
conf = {
|
||||
"pitr_interval": "0s",
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
env.storage_controller.pageserver_api().timeline_mark_invisible(
|
||||
env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
env.storage_controller.pageserver_api().timeline_mark_invisible(
|
||||
env.initial_tenant, env.initial_timeline, True
|
||||
)
|
||||
|
||||
|
||||
def insert_with_action(
|
||||
env: NeonEnv,
|
||||
tenant: TenantId,
|
||||
|
||||
Reference in New Issue
Block a user