Compare commits

..

21 Commits

Author SHA1 Message Date
BodoBolero
a3475286a9 try platform specific options 2024-06-15 11:54:07 +02:00
BodoBolero
9fc67363a9 as long as PR is not committed this still uses the old workflow 2024-06-15 11:50:14 +02:00
BodoBolero
cf4cdd6cd5 temporarily build only on x64 to test out x64 optflags 2024-06-15 11:31:43 +02:00
BodoBolero
b8940f1685 compare native with x86-64 2024-06-15 11:17:09 +02:00
BodoBolero
c5bc73fff0 test performance difference between generic binaries and optimized binaries 2024-06-15 10:18:42 +02:00
Peter Bendel
46210035c5 add halfvec indexing and queries to periodic pgvector performance tests (#8057)
## Problem

halfvec data type was introduced in pgvector 0.7.0 and is popular
because
it allows smaller vectors, smaller indexes and potentially better
performance.

So far we have not tested halfvec in our periodic performance tests.
This PR adds halfvec indexing and halfvec queries to the test.
2024-06-14 18:36:50 +02:00
Alex Chi Z
81892199f6 chore(pageserver): vectored get target_keyspace directly accums (#8055)
follow up on https://github.com/neondatabase/neon/pull/7904

avoid a layer of indirection introduced by `Vec<Range<Key>>`

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-14 11:57:58 -04:00
Alexander Bayandin
83eb02b07a CI: downgrade docker/setup-buildx-action (#8062)
## Problem

I've bumped `docker/setup-buildx-action` in #8042 because I wasn't able
to reproduce the issue from #7445.
But now the issue appears again in
https://github.com/neondatabase/neon/actions/runs/9514373620/job/26226626923?pr=8059
The steps to reproduce aren't clear, it required
`docker/setup-buildx-action@v3` and rebuilding the image without cache,
probably

## Summary of changes
- Downgrade `docker/setup-buildx-action@v3` 
to `docker/setup-buildx-action@v2`
2024-06-14 11:43:51 +00:00
Arseny Sher
a71f58e69c Fix test_segment_init_failure.
Graceful shutdown broke it.
2024-06-14 14:24:15 +03:00
Conrad Ludgate
e6eb0020a1 update rust to 1.79.0 (#8048)
## Problem

rust 1.79 new enabled by default lints

## Summary of changes

* update to rust 1.79
* `s/default_features/default-features/`
* fix proxy dead code.
* fix pageserver dead code.
2024-06-14 13:23:52 +02:00
John Spray
eb0ca9b648 pageserver: improved synthetic size & find_gc_cutoff error handling (#8051)
## Problem

This PR refactors some error handling to avoid log spam on
tenant/timeline shutdown.

- "ignoring failure to find gc cutoffs: timeline shutting down." logs
(https://github.com/neondatabase/neon/issues/8012)
- "synthetic_size_worker: failed to calculate synthetic size for tenant
...: Failed to refresh gc_info before gathering inputs: tenant shutting
down", for example here:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8049/9502988669/index.html#suites/3fc871d9ee8127d8501d607e03205abb/1a074a66548bbcea

Closes: https://github.com/neondatabase/neon/issues/8012

## Summary of changes

- Refactor: Add a PageReconstructError variant to GcError: this is the
only kind of error that find_gc_cutoffs can emit.
- Functional change: only ignore shutdown PageReconstructError variant:
for other variants, treat it as a real error
- Refactor: add a structured CalculateSyntheticSizeError type and use it
instead of anyhow::Error in synthetic size calculations
- Functional change: while iterating through timelines gathering logical
sizes, only drop out if the whole tenant is cancelled: individual
timeline cancellations indicate deletion in progress and we can just
ignore those.
2024-06-14 11:08:11 +01:00
John Spray
6843fd8f89 storage controller: always wait for tenant detach before delete (#8049)
## Problem

This test could fail with a timeout waiting for tenant deletions.

Tenant deletions could get tripped up on nodes transitioning from
offline to online at the moment of the deletion. In a previous
reconciliation, the reconciler would skip detaching a particular
location because the node was offline, but then when we do the delete
the node is marked as online and can be picked as the node to use for
issuing a deletion request. This hits the "Unexpectedly still attached
path", which would still work if the caller kept calling DELETE, but if
a caller does a Delete,get,get,get poll, then it doesn't work because
the GET calls fail after we've marked the tenant as detached.

## Summary of changes

Fix the undesirable storage controller behavior highlighted by this test
failure:
- Change tenant deletion flow to _always_ wait for reconciliation to
succeed: it was unsound to proceed and return 202 if something was still
attached, because after the 202 callers can no longer GET the tenant.

Stabilize the test:
- Add a reconcile_until_idle to the test, so that it will not have
reconciliations running in the background while we mark a node online.
This test is not meant to be a chaos test: we should test that kind of
complexity elsewhere.
- This reconcile_until_idle also fixes another failure mode where the
test might see a None for a tenant location because a reconcile was
mutating it
(https://neon-github-public-dev.s3.amazonaws.com/reports/pr-7288/9500177581/index.html#suites/8fc5d1648d2225380766afde7c428d81/4acece42ae00c442/)

It remains the case that a motivated tester could produce a situation
where a DELETE gives a 500, when precisely the wrong node transitions
from offline to available at the precise moment of a deletion (but the
500 is better than returning 202 and then failing all subsequent GETs).
Note that nodes don't go through the offline state during normal
restarts, so this is super rare. We should eventually fix this by making
DELETE to the pageserver implicitly detach the tenant if it's attached,
but that should wait until nobody is using the legacy-style deletes (the
ones that use 202 + polling)
2024-06-14 10:37:30 +01:00
Alexander Bayandin
edc900028e CI: Update outdated GitHub Actions (#8042)
## Problem
We have some amount of outdated action in the CI pipeline, GitHub
complains about some of them.

## Summary of changes
- Update `actions/checkout@1` (a really old one) in
`vm-compute-node-image`
- Update `actions/checkout@3` in `build-build-tools-image`
- Update `docker/setup-buildx-action` in all workflows / jobs, it was
downgraded in https://github.com/neondatabase/neon/pull/7445, but it
it seems it works fine now
2024-06-14 10:24:13 +01:00
Heikki Linnakangas
789196572e Fix test_replica_query_race flakiness (#8038)
This failed once with `relation "test" does not exist` when trying to
run the query on the standby. It's possible that the standby is started
before the CREATE TABLE is processed in the pageserver, and the standby
opens up for queries before it has received the CREATE TABLE transaction
from the primary. To fix, wait for the standby to catch up to the
primary before starting to run the queries.


https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8025/9483658488/index.html
2024-06-14 11:51:12 +03:00
John Spray
425eed24e8 pageserver: refine shutdown handling in secondary download (#8052)
## Problem

Some code paths during secondary mode download are returning Ok() rather
than UpdateError::Cancelled. This is functionally okay, but it means
that the end of TenantDownloader::download has a sanity check that the
progress is 100% on success, and prints a "Correcting drift..." warning
if not. This warning can be emitted in a test, e.g.
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8049/9503642976/index.html#/testresult/fff1624ba6adae9e.

## Summary of changes

- In secondary download cancellation paths, use
Err(UpdateError::Cancelled) rather than Ok(), so that we drop out of the
download function and do not reach the progress sanity check.
2024-06-14 09:39:31 +01:00
James Broadhead
f67010109f extensions: pgvector-0.7.2 (#8037)
Update pgvector to 0.7.2

Purely mechanical update to pgvector.patch, just as a place to start
from
2024-06-14 10:17:43 +02:00
Tristan Partin
0c3e3a8667 Set application_name for internal connections to computes
This will help when analyzing the origins of connections to a compute
like in [0].

[0]: https://github.com/neondatabase/cloud/issues/14247
2024-06-13 12:06:10 -07:00
Christian Schwarz
82719542c6 fix: vectored get returns incorrect result on inexact materialized page cache hit (#8050)
# Problem

Suppose our vectored get starts with an inexact materialized page cache
hit ("cached lsn") that is shadowed by a newer image layer image layer.
Like so:


```
    <inmemory layers>

    +-+ < delta layer
    | |
   -|-|----- < image layer
    | |
    | |
   -|-|----- < cached lsn for requested key
    +_+
```

The correct visitation order is
1. inmemory layers
2. delta layer records in LSN range `[image_layer.lsn,
oldest_inmemory_layer.lsn_range.start)`
3. image layer

However, the vectored get code, when it visits the delta layer, it
(incorrectly!) returns with state `Complete`.

The reason why it returns is that it calls `on_lsn_advanced` with
`self.lsn_range.start`, i.e., the layer's LSN range.

Instead, it should use `lsn_range.start`, i.e., the LSN range from the
correct visitation order listed above.

# Solution

Use `lsn_range.start` instead of `self.lsn_range.start`.

# Refs

discovered by & fixes https://github.com/neondatabase/neon/issues/6967

Co-authored-by: Vlad Lazar <vlad@neon.tech>
2024-06-13 18:20:47 +00:00
Alex Chi Z
d25f7e3dd5 test(pageserver): add test wal record for unit testing (#8015)
https://github.com/neondatabase/neon/issues/8002

We need mock WAL record to make it easier to write unit tests. This pull
request adds such a record. It has `clear` flag and `append` field. The
tests for legacy-enhanced compaction are not modified yet and will be
part of the next pull request.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-13 09:44:37 -04:00
Anna Khanova
fbccd1e676 Proxy process updated errors (#8026)
## Problem

Respect errors classification from cplane
2024-06-13 14:42:26 +02:00
Heikki Linnakangas
dc2ab4407f Fix on-demand SLRU download on standby starting at WAL segment boundary (#8031)
If a standby is started right after switching to a new WAL segment, the
request in the SLRU download request would point to the beginning of the
segment (e.g. 0/5000000), while the not-modified-since LSN would point
to just after the page header (e.g. 0/5000028). It's effectively the
same position, as there cannot be any WAL records in between, but the
pageserver rightly errors out on any request where the request LSN <
not-modified since LSN.

To fix, round down the not-modified since LSN to the beginning of the
page like the request LSN.

Fixes issue #8030
2024-06-13 00:31:31 +03:00
52 changed files with 807 additions and 486 deletions

View File

@@ -99,7 +99,7 @@ jobs:
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -410,14 +410,14 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Benchmark pgvector hnsw queries
- name: Benchmark pgvector queries
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
test_selection: performance/test_perf_pgvector_queries.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
extra_params: -m remote_cluster --timeout 21600
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"

View File

@@ -55,7 +55,7 @@ jobs:
exit 1
fi
- uses: actions/checkout@v3
- uses: actions/checkout@v4
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker

View File

@@ -858,7 +858,7 @@ jobs:
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max
tags: |
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
- name: Build neon extensions test image
if: matrix.version == 'v16'
uses: docker/build-push-action@v5
@@ -965,7 +965,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v1
uses: actions/checkout@v4
with:
fetch-depth: 0

View File

@@ -120,7 +120,7 @@ num_cpus = "1.15"
num-traits = "0.2.15"
once_cell = "1.13"
opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
@@ -128,7 +128,7 @@ parquet_derive = "51.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.14"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
@@ -184,7 +184,7 @@ tower-service = "0.3.2"
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
twox-hash = { version = "1.6.3", default-features = false }
url = "2.2"
urlencoding = "2.1"

View File

@@ -141,7 +141,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.78.0
ENV RUSTC_VERSION=1.79.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \

View File

@@ -246,12 +246,17 @@ COPY patches/pgvector.patch /pgvector.patch
# By default, pgvector Makefile uses `-march=native`. We don't want that,
# because we build the images on different machines than where we run them.
# Pass OPTFLAGS="" to remove it.
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.1.tar.gz -O pgvector.tar.gz && \
echo "fe6c8cb4e0cd1a8cb60f5badf9e1701e0fcabcfc260931c26d01e155c4dd21d1 pgvector.tar.gz" | sha256sum --check && \
RUN if [ "$(uname -m)" = "x86_64" ]; then \
OPTFLAGS=" -march=x86-64 "; \
elif [ "$(uname -m)" = "aarch64" ]; then \
OPTFLAGS=""; \
fi && \
wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
patch -p1 < /pgvector.patch && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="$OPTFLAGS" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="$OPTFLAGS" install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/vector.control
#########################################################################################
@@ -979,7 +984,7 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|| exit 1; rm -f $f; done
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
# cmake is required for the h3 test
RUN apt-get update && apt-get install -y cmake
RUN patch -p1 < /ext-src/pg_hintplan.patch

View File

@@ -735,7 +735,7 @@ fn cli() -> clap::Command {
Arg::new("filecache-connstr")
.long("filecache-connstr")
.default_value(
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
)
.value_name("FILECACHE_CONNSTR"),
)

View File

@@ -4,10 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = [ "pageserver_api/testing" ]
[dependencies]
anyhow.workspace = true
async-trait.workspace = true

View File

@@ -383,12 +383,6 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: settings
.remove("test_vm_bit_debug_logging")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -512,12 +506,6 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: settings
.remove("test_vm_bit_debug_logging")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
}
};

View File

@@ -4,10 +4,6 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
serde.workspace = true
serde_with.workspace = true

View File

@@ -558,6 +558,12 @@ impl KeySpaceRandomAccum {
self.ranges.push(range);
}
pub fn add_keyspace(&mut self, keyspace: KeySpace) {
for range in keyspace.ranges {
self.add_range(range);
}
}
pub fn to_keyspace(mut self) -> KeySpace {
let mut ranges = Vec::new();
if !self.ranges.is_empty() {

View File

@@ -322,8 +322,6 @@ pub struct TenantConfig {
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub switch_aux_file_policy: Option<AuxFilePolicy>,
#[cfg(feature = "testing")]
pub test_vm_bit_debug_logging: Option<bool>,
}
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`

View File

@@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
hyper.workspace = true
opentelemetry = { workspace = true, features=["rt-tokio"] }
opentelemetry-otlp = { workspace = true, default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions.workspace = true
reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing"]
testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true

View File

@@ -2,10 +2,9 @@
//! and push them to a HTTP endpoint.
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{
mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
};
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
@@ -350,19 +349,12 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re
// Same for the loop that fetches computed metrics.
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
// which turns out is really handy to understand the system.
let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
return;
};
// this error can be returned if timeline is shutting down, but it does not
// mean the synthetic size worker should terminate.
let shutting_down = matches!(
e.downcast_ref::<PageReconstructError>(),
Some(PageReconstructError::Cancelled)
);
if !shutting_down {
let tenant_shard_id = tenant.tenant_shard_id();
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
Ok(_) => {}
Err(CalculateSyntheticSizeError::Cancelled) => {}
Err(e) => {
let tenant_shard_id = tenant.tenant_shard_id();
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
}
}
}

View File

@@ -1135,7 +1135,10 @@ async fn tenant_size_handler(
&ctx,
)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| match e {
crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown,
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})?;
let mut sizes = None;
let accepts_html = headers
@@ -1143,9 +1146,7 @@ async fn tenant_size_handler(
.map(|v| v == "text/html")
.unwrap_or_default();
if !inputs_only.unwrap_or(false) {
let storage_model = inputs
.calculate_model()
.map_err(ApiError::InternalServerError)?;
let storage_model = inputs.calculate_model();
let size = storage_model.calculate();
// If request header expects html, return html

View File

@@ -509,11 +509,24 @@ pub(crate) enum GcError {
#[error(transparent)]
Remote(anyhow::Error),
// An error reading while calculating GC cutoffs
#[error(transparent)]
GcCutoffs(PageReconstructError),
// If GC was invoked for a particular timeline, this error means it didn't exist
#[error("timeline not found")]
TimelineNotFound,
}
impl From<PageReconstructError> for GcError {
fn from(value: PageReconstructError) -> Self {
match value {
PageReconstructError::Cancelled => Self::TimelineCancelled,
other => Self::GcCutoffs(other),
}
}
}
impl Tenant {
/// Yet another helper for timeline initialization.
///
@@ -1033,7 +1046,6 @@ impl Tenant {
remote_metadata,
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
},
ctx,
@@ -1059,7 +1071,6 @@ impl Tenant {
timeline_id,
&index_part.metadata,
remote_timeline_client,
self.deletion_queue_client.clone(),
)
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
.await
@@ -2921,17 +2932,9 @@ impl Tenant {
.checked_sub(horizon)
.unwrap_or(Lsn(0));
let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await;
match res {
Ok(cutoffs) => {
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
assert!(old.is_none());
}
Err(e) => {
tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}");
}
}
let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?;
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
assert!(old.is_none());
}
if !self.is_active() || self.cancel.is_cancelled() {
@@ -3443,7 +3446,6 @@ impl Tenant {
);
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
}
}
@@ -3553,7 +3555,7 @@ impl Tenant {
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<size::ModelInputs> {
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
let logical_sizes_at_once = self
.conf
.concurrent_tenant_size_logical_size_queries
@@ -3568,8 +3570,8 @@ impl Tenant {
// See more for on the issue #2748 condenced out of the initial PR review.
let mut shared_cache = tokio::select! {
locked = self.cached_logical_sizes.lock() => locked,
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
_ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"),
_ = cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
_ = self.cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
};
size::gather_inputs(
@@ -3593,10 +3595,10 @@ impl Tenant {
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
) -> Result<u64, size::CalculateSyntheticSizeError> {
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
let size = inputs.calculate()?;
let size = inputs.calculate();
self.set_cached_synthetic_size(size);
@@ -3831,8 +3833,6 @@ pub(crate) mod harness {
tenant_conf.image_layer_creation_check_threshold,
),
switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: Some(tenant_conf.test_vm_bit_debug_logging),
}
}
}
@@ -4043,6 +4043,7 @@ mod tests {
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
use crate::walrecord::NeonWalRecord;
use crate::DEFAULT_PG_VERSION;
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
@@ -6366,7 +6367,7 @@ mod tests {
.await?;
Ok(res.pop_last().map(|(k, v)| {
assert_eq!(k, key);
v.unwrap().0
v.unwrap()
}))
}
@@ -6707,8 +6708,8 @@ mod tests {
}
#[tokio::test]
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -6863,4 +6864,79 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_neon_test_record() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_neon_test_record")?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
),
(
get_key(1),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
),
(get_key(2), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(2),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
),
(get_key(3), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(3),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_clear()),
),
(get_key(4), Lsn(0x10), Value::Image("0x10".into())),
(
get_key(4),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init()),
),
];
let image1 = vec![(get_key(1), "0x10".into())];
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1], // delta layers
vec![(Lsn(0x10), image1)], // image layers
Lsn(0x50),
)
.await?;
assert_eq!(
tline.get(get_key(1), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"0x10,0x20,0x30")
);
assert_eq!(
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"0x10,0x20,0x30")
);
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
Ok(())
}
}

View File

@@ -377,9 +377,6 @@ pub struct TenantConf {
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
/// file is written.
pub switch_aux_file_policy: AuxFilePolicy,
#[cfg(feature = "testing")]
pub test_vm_bit_debug_logging: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -479,11 +476,6 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub switch_aux_file_policy: Option<AuxFilePolicy>,
#[cfg(feature = "testing")]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub test_vm_bit_debug_logging: Option<bool>,
}
impl TenantConfOpt {
@@ -546,10 +538,6 @@ impl TenantConfOpt {
switch_aux_file_policy: self
.switch_aux_file_policy
.unwrap_or(global_conf.switch_aux_file_policy),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: self
.test_vm_bit_debug_logging
.unwrap_or(global_conf.test_vm_bit_debug_logging),
}
}
}
@@ -594,8 +582,6 @@ impl Default for TenantConf {
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: false,
}
}
}
@@ -671,8 +657,6 @@ impl From<TenantConfOpt> for models::TenantConfig {
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
switch_aux_file_policy: value.switch_aux_file_policy,
#[cfg(feature = "testing")]
test_vm_bit_debug_logging: value.test_vm_bit_debug_logging,
}
}
}

View File

@@ -513,7 +513,7 @@ impl<'a> TenantDownloader<'a> {
// cover our access to local storage.
let Ok(_guard) = self.secondary_state.gate.enter() else {
// Shutting down
return Ok(());
return Err(UpdateError::Cancelled);
};
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
@@ -846,7 +846,7 @@ impl<'a> TenantDownloader<'a> {
for layer in timeline.layers {
if self.secondary_state.cancel.is_cancelled() {
tracing::debug!("Cancelled -- dropping out of layer loop");
return Ok(());
return Err(UpdateError::Cancelled);
}
// Existing on-disk layers: just update their access time.

View File

@@ -3,7 +3,6 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
@@ -11,7 +10,7 @@ use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{LogicalSizeCalculationCause, Tenant};
use super::{GcError, LogicalSizeCalculationCause, Tenant};
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -43,6 +42,44 @@ pub struct SegmentMeta {
pub kind: LsnKind,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum CalculateSyntheticSizeError {
/// Something went wrong internally to the calculation of logical size at a particular branch point
#[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
LogicalSize {
timeline_id: TimelineId,
lsn: Lsn,
error: CalculateLogicalSizeError,
},
/// Something went wrong internally when calculating GC parameters at start of size calculation
#[error(transparent)]
GcInfo(GcError),
/// Totally unexpected errors, like panics joining a task
#[error(transparent)]
Fatal(anyhow::Error),
/// The LSN we are trying to calculate a size at no longer exists at the point we query it
#[error("Could not find size at {lsn} in timeline {timeline_id}")]
LsnNotFound { timeline_id: TimelineId, lsn: Lsn },
/// Tenant shut down while calculating size
#[error("Cancelled")]
Cancelled,
}
impl From<GcError> for CalculateSyntheticSizeError {
fn from(value: GcError) -> Self {
match value {
GcError::TenantCancelled | GcError::TimelineCancelled => {
CalculateSyntheticSizeError::Cancelled
}
other => CalculateSyntheticSizeError::GcInfo(other),
}
}
}
impl SegmentMeta {
fn size_needed(&self) -> bool {
match self.kind {
@@ -116,12 +153,9 @@ pub(super) async fn gather_inputs(
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ModelInputs> {
) -> Result<ModelInputs, CalculateSyntheticSizeError> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant
.refresh_gc_info(cancel, ctx)
.await
.context("Failed to refresh gc_info before gathering inputs")?;
tenant.refresh_gc_info(cancel, ctx).await?;
// Collect information about all the timelines
let mut timelines = tenant.list_timelines();
@@ -327,6 +361,12 @@ pub(super) async fn gather_inputs(
)
.await?;
if tenant.cancel.is_cancelled() {
// If we're shutting down, return an error rather than a sparse result that might include some
// timelines from before we started shutting down
return Err(CalculateSyntheticSizeError::Cancelled);
}
Ok(ModelInputs {
segments,
timeline_inputs,
@@ -345,7 +385,7 @@ async fn fill_logical_sizes(
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), CalculateSyntheticSizeError> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
timelines
.iter()
@@ -387,7 +427,7 @@ async fn fill_logical_sizes(
}
// Perform the size lookups
let mut have_any_error = false;
let mut have_any_error = None;
while let Some(res) = joinset.join_next().await {
// each of these come with Result<anyhow::Result<_>, JoinError>
// because of spawn + spawn_blocking
@@ -398,21 +438,36 @@ async fn fill_logical_sizes(
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
have_any_error = true;
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
anyhow::anyhow!(join_error)
.context("task that calls spawn_ondemand_logical_size_calculation"),
));
}
Ok(Err(recv_result_error)) => {
// cannot really do anything, as this panic is likely a bug
error!("failed to receive logical size query result: {recv_result_error:#}");
have_any_error = true;
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
anyhow::anyhow!(recv_result_error)
.context("Receiving logical size query result"),
));
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
if matches!(error, CalculateLogicalSizeError::Cancelled) {
// Skip this: it's okay if one timeline among many is shutting down while we
// calculate inputs for the overall tenant.
continue;
} else {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
timeline_id: timeline.timeline_id,
lsn,
error,
});
}
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
@@ -426,10 +481,10 @@ async fn fill_logical_sizes(
// prune any keys not needed anymore; we record every used key and added key.
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
if have_any_error {
if let Some(error) = have_any_error {
// we cannot complete this round, because we are missing data.
// we have however cached all we were able to request calculation on.
anyhow::bail!("failed to calculate some logical_sizes");
return Err(error);
}
// Insert the looked up sizes to the Segments
@@ -444,32 +499,29 @@ async fn fill_logical_sizes(
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
seg.segment.size = Some(*size);
} else {
bail!("could not find size at {} in timeline {}", lsn, timeline_id);
return Err(CalculateSyntheticSizeError::LsnNotFound { timeline_id, lsn });
}
}
Ok(())
}
impl ModelInputs {
pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
// Convert SegmentMetas into plain Segments
let storage = StorageModel {
StorageModel {
segments: self
.segments
.iter()
.map(|seg| seg.segment.clone())
.collect(),
};
Ok(storage)
}
}
// calculate total project size
pub fn calculate(&self) -> anyhow::Result<u64> {
let storage = self.calculate_model()?;
pub fn calculate(&self) -> u64 {
let storage = self.calculate_model();
let sizes = storage.calculate();
Ok(sizes.total_size)
sizes.total_size
}
}
@@ -656,7 +708,7 @@ fn verify_size_for_multiple_branches() {
"#;
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
assert_eq!(inputs.calculate().unwrap(), 37_851_408);
assert_eq!(inputs.calculate(), 37_851_408);
}
#[test]
@@ -711,7 +763,7 @@ fn verify_size_for_one_branch() {
let model: ModelInputs = serde_json::from_str(doc).unwrap();
let res = model.calculate_model().unwrap().calculate();
let res = model.calculate_model().calculate();
println!("calculated synthetic size: {}", res.total_size);
println!("result: {:?}", serde_json::to_string(&res.segments));

View File

@@ -73,7 +73,7 @@ where
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
/// call, to collect more records.
///
#[derive(Debug, Default, Clone, PartialEq, Eq)]
#[derive(Debug, Default)]
pub struct ValueReconstructState {
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
@@ -318,7 +318,7 @@ pub(crate) struct LayerFringe {
#[derive(Debug)]
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: Vec<KeySpace>,
target_keyspace: KeySpaceRandomAccum,
}
impl LayerFringe {
@@ -342,17 +342,13 @@ impl LayerFringe {
_,
LayerKeyspace {
layer,
target_keyspace,
mut target_keyspace,
},
)) => {
let mut keyspace = KeySpaceRandomAccum::new();
for ks in target_keyspace {
for part in ks.ranges {
keyspace.add_range(part);
}
}
Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range))
}
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
None => unreachable!("fringe internals are always consistent"),
}
}
@@ -367,16 +363,18 @@ impl LayerFringe {
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.push(keyspace);
entry.get_mut().target_keyspace.add_keyspace(keyspace);
}
Entry::Vacant(entry) => {
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
layer_id: layer_id.clone(),
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
layer,
target_keyspace: vec![keyspace],
target_keyspace: accum,
});
}
}

View File

@@ -219,7 +219,6 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
lsn_range: Range<Lsn>,
file: VirtualFile,
file_id: FileId,
@@ -785,7 +784,6 @@ impl DeltaLayerInner {
file_id,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn_range: actual_summary.lsn_range,
max_vectored_read_bytes,
}))
}
@@ -822,7 +820,6 @@ impl DeltaLayerInner {
if entry_lsn < lsn_range.start {
return false;
}
assert!(entry_lsn <= search_key.lsn(), "certain because of how backwards visit direction works");
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
@@ -912,7 +909,7 @@ impl DeltaLayerInner {
let reads = Self::plan_reads(
&keyspace,
lsn_range,
lsn_range.clone(),
data_end_offset,
index_reader,
planner,
@@ -925,7 +922,7 @@ impl DeltaLayerInner {
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
Ok(())
}

View File

@@ -62,6 +62,7 @@ use std::{
ops::ControlFlow,
};
use crate::metrics::GetKind;
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
use crate::{
aux_file::AuxFileSizeEstimator,
@@ -75,7 +76,6 @@ use crate::{
disk_usage_eviction_task::DiskUsageEvictionInfo,
pgdatadir_mapping::CollectKeySpaceError,
};
use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
@@ -205,7 +205,6 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub deletion_queue_client: DeletionQueueClient,
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
@@ -910,9 +909,7 @@ impl Timeline {
img: cached_page_img,
};
self.get_impl(key, lsn, reconstruct_state, ctx)
.await
.map(|v| v.0)
self.get_impl(key, lsn, reconstruct_state, ctx).await
}
GetImpl::Vectored => {
let keyspace = KeySpace {
@@ -924,59 +921,16 @@ impl Timeline {
let mut reconstruct_state = ValuesReconstructState::new();
// Only add the cached image to the reconstruct state when it exists.
let cached_page_img_lsn = cached_page_img.as_ref().map(|(lsn, _)| *lsn);
if cached_page_img.is_some() {
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
}
let debug_log = {
#[cfg(feature = "testing")]
{
self.get_test_vm_bit_debug_logging()
}
#[cfg(not(feature = "testing"))]
{
false
}
};
if debug_log {
tracing::info!(%key, %lsn, ?cached_page_img_lsn, "debug-logging page reconstruction");
}
if debug_log {
tracing::info!(
location = "before vectored get",
"debug-logging page reconstruction"
);
self.layers
.read()
.await
.layer_map()
.dump(false, ctx)
.await
.unwrap();
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
if debug_log {
tracing::info!(
location = "before validation",
"debug-logging page reconstruction"
);
self.layers
.read()
.await
.layer_map()
.dump(false, ctx)
.await
.unwrap();
}
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
@@ -994,7 +948,7 @@ impl Timeline {
"Singular vectored get returned wrong key"
)))
} else {
value.map(|v| v.0)
value
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
@@ -1018,7 +972,7 @@ impl Timeline {
lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
@@ -1157,12 +1111,7 @@ impl Timeline {
}
}
let Ok(res) = res else {
return Err(res.unwrap_err());
};
Ok(BTreeMap::from_iter(
res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
))
res
}
/// Scan the keyspace and return all existing key-values in the keyspace. This currently uses vectored
@@ -1226,12 +1175,7 @@ impl Timeline {
recording.observe(throttled);
}
let Ok(vectored_res) = vectored_res else {
return Err(vectored_res.unwrap_err());
};
Ok(BTreeMap::from_iter(
vectored_res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
))
vectored_res
}
/// Not subject to [`Self::timeline_get_throttle`].
@@ -1240,10 +1184,7 @@ impl Timeline {
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
GetVectoredError,
> {
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
for range in keyspace.ranges {
@@ -1302,10 +1243,7 @@ impl Timeline {
lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
GetVectoredError,
> {
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let get_kind = if keyspace.total_raw_size() == 1 {
GetKind::Singular
} else {
@@ -1322,10 +1260,7 @@ impl Timeline {
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<
Key,
Result<(Bytes, ValueReconstructState), PageReconstructError>,
> = BTreeMap::new();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
@@ -1361,10 +1296,7 @@ impl Timeline {
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
GetVectoredError,
>,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
@@ -1437,8 +1369,8 @@ impl Timeline {
key: &Key,
keyspace: &KeySpace,
lsn: Lsn,
(seq, seq_reconstruct_state): &(Bytes, ValueReconstructState),
(vec, vec_reconstruct_state): &(Bytes, ValueReconstructState),
seq: &Bytes,
vec: &Bytes,
) {
if *key == AUX_FILES_KEY {
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
@@ -1461,16 +1393,10 @@ impl Timeline {
}
} else {
// All other keys should reconstruct deterministically, so we simply compare the blobs.
if seq != vec {
assert_eq!(
seq_reconstruct_state, vec_reconstruct_state,
"Reconstruct state mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
assert_eq!(
seq, vec,
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
}
assert_eq!(
seq, vec,
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
}
}
@@ -2239,15 +2165,6 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
#[cfg(feature = "testing")]
fn get_test_vm_bit_debug_logging(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.test_vm_bit_debug_logging
.unwrap_or(self.conf.default_tenant_conf.test_vm_bit_debug_logging)
}
fn get_image_layer_creation_check_threshold(&self) -> u8 {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2263,10 +2180,6 @@ impl Timeline {
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
#[cfg(feature = "testing")]
{
info!(?new_conf.test_vm_bit_debug_logging, "updating tenant conf");
}
// The threshold is embedded in the metric. So, we need to update it.
{
@@ -4442,7 +4355,7 @@ impl Timeline {
let mut total_kb_retrieved = 0;
let mut total_keys_retrieved = 0;
for (k, v) in data {
let (v, _) = v.map_err(CreateImageLayersError::PageReconstructError)?;
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
total_kb_retrieved += KEY_SIZE + v.len();
total_keys_retrieved += 1;
new_data.insert(k, v);
@@ -4909,7 +4822,7 @@ impl Timeline {
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<GcCutoffs> {
) -> Result<GcCutoffs, PageReconstructError> {
let _timer = self
.metrics
.find_gc_cutoffs_histo
@@ -5237,7 +5150,7 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
@@ -5250,7 +5163,7 @@ impl Timeline {
img_lsn,
request_lsn,
);
Ok((img.clone(), data))
Ok(img.clone())
} else {
Err(PageReconstructError::from(anyhow!(
"base image for {key} at {request_lsn} not found"
@@ -5282,8 +5195,6 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0;
let ret_state = data.clone();
let img = match self
.walredo_mgr
.as_ref()
@@ -5314,7 +5225,7 @@ impl Timeline {
}
}
Ok((img, ret_state))
Ok(img)
}
}
}

View File

@@ -1064,7 +1064,7 @@ impl Timeline {
img: base_image,
records: delta_above_base_image,
};
let (img, _) = tline.reconstruct_value(key, horizon, state).await?;
let img = tline.reconstruct_value(key, horizon, state).await?;
Ok((keys_above_horizon, img))
}

View File

@@ -11,7 +11,6 @@ use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use crate::{
config::PageServerConf,
deletion_queue::DeletionQueueClient,
task_mgr::{self, TaskKind},
tenant::{
metadata::TimelineMetadata,
@@ -263,7 +262,6 @@ impl DeleteTimelineFlow {
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: RemoteTimelineClient,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -274,7 +272,6 @@ impl DeleteTimelineFlow {
None, // Ancestor is not needed for deletion.
TimelineResources {
remote_client,
deletion_queue_client,
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
},
// Important. We dont pass ancestor above because it can be missing.

View File

@@ -49,6 +49,19 @@ pub enum NeonWalRecord {
file_path: String,
content: Option<Bytes>,
},
/// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
#[cfg(test)]
Test {
/// Append a string to the image.
append: String,
/// Clear the image before appending.
clear: bool,
/// Treat this record as an init record. `clear` should be set to true if this field is set
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
/// its references in `timeline.rs`.
will_init: bool,
},
}
impl NeonWalRecord {
@@ -58,11 +71,39 @@ impl NeonWalRecord {
// If you change this function, you'll also need to change ValueBytes::will_init
match self {
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
#[cfg(test)]
NeonWalRecord::Test { will_init, .. } => *will_init,
// None of the special neon record types currently initialize the page
_ => false,
}
}
#[cfg(test)]
pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
Self::Test {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
}
}
#[cfg(test)]
pub(crate) fn wal_clear() -> Self {
Self::Test {
append: "".to_string(),
clear: true,
will_init: false,
}
}
#[cfg(test)]
pub(crate) fn wal_init() -> Self {
Self::Test {
append: "".to_string(),
clear: true,
will_init: true,
}
}
}
/// DecodedBkpBlock represents per-page data contained in a WAL record.

View File

@@ -244,6 +244,20 @@ pub(crate) fn apply_in_neon(
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
#[cfg(test)]
NeonWalRecord::Test {
append,
clear,
will_init,
} => {
if *will_init {
assert!(*clear, "init record must be clear to ensure correctness");
}
if *clear {
page.clear();
}
page.put_slice(append.as_bytes());
}
}
Ok(())
}

View File

@@ -1,19 +1,8 @@
From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 2 Feb 2024 22:26:45 +0200
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
Now that the WAL-logging happens as a separate step at the end of the
build, we need a few neon-specific hints to make it work.
---
src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
index 680789b..ec54dea 100644
index dcfb2bd..d5189ee 100644
--- a/src/hnswbuild.c
+++ b/src/hnswbuild.c
@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
@@ -860,9 +860,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
@@ -31,7 +20,7 @@ index 680789b..ec54dea 100644
/* Close relations within worker */
index_close(indexRel, indexLockmode);
table_close(heapRel, heapLockmode);
@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
@@ -1117,12 +1125,38 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
SeedRandom(42);
#endif
@@ -43,14 +32,13 @@ index 680789b..ec54dea 100644
BuildGraph(buildstate, forkNum);
- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
if (RelationNeedsWAL(index))
+ {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
+ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) {
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true);
+#ifdef NEON_SMGR
+ {
+#if PG_VERSION_NUM >= 160000
@@ -60,7 +48,7 @@ index 680789b..ec54dea 100644
+#endif
+
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
+ }
+#endif
@@ -69,10 +57,6 @@ index 680789b..ec54dea 100644
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
FreeBuildState(buildstate);
}
--
2.39.2

View File

@@ -3112,12 +3112,12 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsn = UINT64_MAX;
/*
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU
* segment has not changed since the basebackup, because in order to
* modify it, we would have had to download it already. And once
* downloaded, we never evict SLRU segments from local disk.
*/
not_modified_since = GetRedoStartLsn();
not_modified_since = nm_adjust_lsn(GetRedoStartLsn());
SlruKind kind;

View File

@@ -1,16 +1,183 @@
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::{self, Display};
use crate::auth::IpPattern;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::proxy::retry::ShouldRetry;
/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
#[derive(Debug, Deserialize)]
pub struct ConsoleError {
pub error: Box<str>,
#[serde(skip)]
pub http_status_code: http::StatusCode,
pub status: Option<Status>,
}
impl ConsoleError {
pub fn get_reason(&self) -> Reason {
self.status
.as_ref()
.and_then(|s| s.details.error_info.as_ref())
.map(|e| e.reason)
.unwrap_or(Reason::Unknown)
}
pub fn get_user_facing_message(&self) -> String {
use super::provider::errors::REQUEST_FAILED;
self.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map(|m| m.message.clone().into())
.unwrap_or_else(|| {
// Ask @neondatabase/control-plane for review before adding more.
match self.http_status_code {
http::StatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
http::StatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
}
_ => REQUEST_FAILED.to_owned(),
}
})
}
}
impl Display for ConsoleError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = self
.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map(|m| m.message.as_ref())
.unwrap_or_else(|| &self.error);
write!(f, "{}", msg)
}
}
impl ShouldRetry for ConsoleError {
fn could_retry(&self) -> bool {
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
return match &self {
ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} => !error.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
ref error,
..
} => {
!error.contains("quota exceeded")
&& !error.contains("the limit for current plan reached")
}
_ => false,
};
}
// retry if the response has a retry delay
if let Some(retry_info) = self
.status
.as_ref()
.and_then(|s| s.details.retry_info.as_ref())
{
retry_info.retry_delay_ms > 0
} else {
false
}
}
}
#[derive(Debug, Deserialize)]
pub struct Status {
pub code: Box<str>,
pub message: Box<str>,
pub details: Details,
}
#[derive(Debug, Deserialize)]
pub struct Details {
pub error_info: Option<ErrorInfo>,
pub retry_info: Option<RetryInfo>,
pub user_facing_message: Option<UserFacingMessage>,
}
#[derive(Debug, Deserialize)]
pub struct ErrorInfo {
pub reason: Reason,
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
}
#[derive(Clone, Copy, Debug, Deserialize, Default)]
pub enum Reason {
#[serde(rename = "ROLE_PROTECTED")]
RoleProtected,
#[serde(rename = "RESOURCE_NOT_FOUND")]
ResourceNotFound,
#[serde(rename = "PROJECT_NOT_FOUND")]
ProjectNotFound,
#[serde(rename = "ENDPOINT_NOT_FOUND")]
EndpointNotFound,
#[serde(rename = "BRANCH_NOT_FOUND")]
BranchNotFound,
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
RateLimitExceeded,
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
NonPrimaryBranchComputeTimeExceeded,
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
ActiveTimeQuotaExceeded,
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
ComputeTimeQuotaExceeded,
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
WrittenDataQuotaExceeded,
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
DataTransferQuotaExceeded,
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
LogicalSizeQuotaExceeded,
#[default]
#[serde(other)]
Unknown,
}
impl Reason {
pub fn is_not_found(&self) -> bool {
matches!(
self,
Reason::ResourceNotFound
| Reason::ProjectNotFound
| Reason::EndpointNotFound
| Reason::BranchNotFound
)
}
}
#[derive(Debug, Deserialize)]
pub struct RetryInfo {
pub retry_delay_ms: u64,
}
#[derive(Debug, Deserialize)]
pub struct UserFacingMessage {
pub message: Box<str>,
}
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].

View File

@@ -25,8 +25,8 @@ use tracing::info;
pub mod errors {
use crate::{
console::messages::{self, ConsoleError},
error::{io_error, ReportableError, UserFacingError},
http,
proxy::retry::ShouldRetry,
};
use thiserror::Error;
@@ -34,17 +34,14 @@ pub mod errors {
use super::ApiLockError;
/// A go-to error message which doesn't leak any detail.
const REQUEST_FAILED: &str = "Console request failed";
pub const REQUEST_FAILED: &str = "Console request failed";
/// Common console API error.
#[derive(Debug, Error)]
pub enum ApiError {
/// Error returned by the console itself.
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
Console {
status: http::StatusCode,
text: Box<str>,
},
#[error("{REQUEST_FAILED} with {0}")]
Console(ConsoleError),
/// Various IO errors like broken pipe or malformed payload.
#[error("{REQUEST_FAILED}: {0}")]
@@ -53,11 +50,11 @@ pub mod errors {
impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
pub fn http_status_code(&self) -> Option<http::StatusCode> {
pub fn get_reason(&self) -> messages::Reason {
use ApiError::*;
match self {
Console { status, .. } => Some(*status),
_ => None,
Console(e) => e.get_reason(),
_ => messages::Reason::Unknown,
}
}
}
@@ -67,22 +64,7 @@ pub mod errors {
use ApiError::*;
match self {
// To minimize risks, only select errors are forwarded to users.
// Ask @neondatabase/control-plane for review before adding more.
Console { status, .. } => match *status {
http::StatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
http::StatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
}
_ => REQUEST_FAILED.to_owned(),
},
Console(c) => c.get_user_facing_message(),
_ => REQUEST_FAILED.to_owned(),
}
}
@@ -91,29 +73,56 @@ pub mod errors {
impl ReportableError for ApiError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiError::Console {
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ApiError::Console {
status: http::StatusCode::UNPROCESSABLE_ENTITY,
text,
} if text.contains("compute time quota of non-primary branches is exceeded") => {
crate::error::ErrorKind::User
ApiError::Console(e) => {
use crate::error::ErrorKind::*;
match e.get_reason() {
crate::console::messages::Reason::RoleProtected => User,
crate::console::messages::Reason::ResourceNotFound => User,
crate::console::messages::Reason::ProjectNotFound => User,
crate::console::messages::Reason::EndpointNotFound => User,
crate::console::messages::Reason::BranchNotFound => User,
crate::console::messages::Reason::RateLimitExceeded => ServiceRateLimit,
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
User
}
crate::console::messages::Reason::ActiveTimeQuotaExceeded => User,
crate::console::messages::Reason::ComputeTimeQuotaExceeded => User,
crate::console::messages::Reason::WrittenDataQuotaExceeded => User,
crate::console::messages::Reason::DataTransferQuotaExceeded => User,
crate::console::messages::Reason::LogicalSizeQuotaExceeded => User,
crate::console::messages::Reason::Unknown => match &e {
ConsoleError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
error,
..
} if error.contains(
"compute time quota of non-primary branches is exceeded",
) =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
error,
..
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
},
}
}
ApiError::Console {
status: http::StatusCode::LOCKED,
text,
} if text.contains("quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ApiError::Console {
status: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ApiError::Console { .. } => crate::error::ErrorKind::ControlPlane,
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}
}
@@ -124,31 +133,7 @@ pub mod errors {
match self {
// retry some transport errors
Self::Transport(io) => io.could_retry(),
// retry some temporary failures because the compute was in a bad state
// (bad request can be returned when the endpoint was in transition)
Self::Console {
status: http::StatusCode::BAD_REQUEST,
..
} => true,
// don't retry when quotas are exceeded
Self::Console {
status: http::StatusCode::UNPROCESSABLE_ENTITY,
ref text,
} => !text.contains("compute time quota of non-primary branches is exceeded"),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
Self::Console {
status: http::StatusCode::LOCKED,
ref text,
} => {
// written data quota exceeded
// data transfer quota exceeded
// compute time quota exceeded
// logical size quota exceeded
!text.contains("quota exceeded")
&& !text.contains("the limit for current plan reached")
}
_ => false,
Self::Console(e) => e.could_retry(),
}
}
}
@@ -509,7 +494,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
self.metrics
.semaphore_acquire_seconds
.observe(now.elapsed().as_secs_f64());
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
Ok(WakeComputePermit { permit: permit? })
}

View File

@@ -94,12 +94,14 @@ impl Api {
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
Err(e) => match e.http_status_code() {
Some(http::StatusCode::NOT_FOUND) => {
// TODO(anna): retry
Err(e) => {
if e.get_reason().is_not_found() {
return Ok(AuthInfo::default());
} else {
return Err(e.into());
}
_otherwise => return Err(e.into()),
},
}
};
let secret = if body.role_secret.is_empty() {
@@ -328,19 +330,24 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
let s = response.bytes().await?;
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
info!("response_error plaintext: {:?}", s);
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let body = response.json().await.unwrap_or_else(|e| {
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ConsoleError {
error: "reason unclear (malformed error message)".into(),
http_status_code: status,
status: None,
}
});
body.http_status_code = status;
let text = body.error;
error!("console responded with an error ({status}): {text}");
Err(ApiError::Console { status, text })
error!("console responded with an error ({status}): {body:?}");
Err(ApiError::Console(body))
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {

View File

@@ -12,7 +12,7 @@ use crate::auth::backend::{
};
use crate::config::{CertResolver, RetryConfig};
use crate::console::caches::NodeInfoCache;
use crate::console::messages::MetricsAuxInfo;
use crate::console::messages::{ConsoleError, MetricsAuxInfo};
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
use crate::console::{self, CachedNodeInfo, NodeInfo};
use crate::error::ErrorKind;
@@ -484,18 +484,20 @@ impl TestBackend for TestConnectMechanism {
match action {
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
ConnectAction::WakeFail => {
let err = console::errors::ApiError::Console {
status: http::StatusCode::FORBIDDEN,
text: "TEST".into(),
};
let err = console::errors::ApiError::Console(ConsoleError {
http_status_code: http::StatusCode::FORBIDDEN,
error: "TEST".into(),
status: None,
});
assert!(!err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}
ConnectAction::WakeRetry => {
let err = console::errors::ApiError::Console {
status: http::StatusCode::BAD_REQUEST,
text: "TEST".into(),
};
let err = console::errors::ApiError::Console(ConsoleError {
http_status_code: http::StatusCode::BAD_REQUEST,
error: "TEST".into(),
status: None,
});
assert!(err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}

View File

@@ -1,4 +1,5 @@
use crate::config::RetryConfig;
use crate::console::messages::ConsoleError;
use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo};
use crate::context::RequestMonitoring;
use crate::metrics::{
@@ -88,36 +89,76 @@ fn report_error(e: &WakeComputeError, retry: bool) {
let kind = match e {
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::UNPROCESSABLE_ENTITY,
ref text,
}) if text.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => WakeupFailureKind::ApiConsoleBadRequest,
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
WakeupFailureKind::ApiConsoleOtherServerError
}
WakeComputeError::ApiError(ApiError::Console { .. }) => {
WakeupFailureKind::ApiConsoleOtherError
}
WakeComputeError::ApiError(ApiError::Console(e)) => match e.get_reason() {
crate::console::messages::Reason::RoleProtected => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::ResourceNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::ProjectNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::EndpointNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::BranchNotFound => {
WakeupFailureKind::ApiConsoleBadRequest
}
crate::console::messages::Reason::RateLimitExceeded => {
WakeupFailureKind::ApiConsoleLocked
}
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::ActiveTimeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::ComputeTimeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::WrittenDataQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::DataTransferQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::LogicalSizeQuotaExceeded => {
WakeupFailureKind::QuotaExceeded
}
crate::console::messages::Reason::Unknown => match e {
ConsoleError {
http_status_code: StatusCode::LOCKED,
ref error,
..
} if error.contains("written data quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
ConsoleError {
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} if error.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
ConsoleError {
http_status_code: StatusCode::LOCKED,
..
} => WakeupFailureKind::ApiConsoleLocked,
ConsoleError {
http_status_code: StatusCode::BAD_REQUEST,
..
} => WakeupFailureKind::ApiConsoleBadRequest,
ConsoleError {
http_status_code, ..
} if http_status_code.is_server_error() => {
WakeupFailureKind::ApiConsoleOtherServerError
}
ConsoleError { .. } => WakeupFailureKind::ApiConsoleOtherError,
},
},
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
};

View File

@@ -1,5 +1,3 @@
use std::usize;
use super::{LimitAlgorithm, Outcome, Sample};
/// Loss-based congestion avoidance.

View File

@@ -32,8 +32,6 @@ pub struct ClientFirstMessage<'a> {
pub bare: &'a str,
/// Channel binding mode.
pub cbind_flag: ChannelBinding<&'a str>,
/// (Client username)[<https://github.com/postgres/postgres/blob/94226d4506e66d6e7cbf/src/backend/libpq/auth-scram.c#L13>].
pub username: &'a str,
/// Client nonce.
pub nonce: &'a str,
}
@@ -58,6 +56,14 @@ impl<'a> ClientFirstMessage<'a> {
// In theory, these might be preceded by "reserved-mext" (i.e. "m=")
let username = parts.next()?.strip_prefix("n=")?;
// https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14
if !username.is_empty() {
tracing::warn!(username, "scram username provided, but is not expected")
// TODO(conrad):
// return None;
}
let nonce = parts.next()?.strip_prefix("r=")?;
// Validate but ignore auth extensions
@@ -66,7 +72,6 @@ impl<'a> ClientFirstMessage<'a> {
Some(Self {
bare,
cbind_flag,
username,
nonce,
})
}
@@ -188,19 +193,18 @@ mod tests {
// (Almost) real strings captured during debug sessions
let cases = [
(NotSupportedClient, "n,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedServer, "y,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedClient, "n,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
(NotSupportedServer, "y,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
(
Required("tls-server-end-point"),
"p=tls-server-end-point,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju",
"p=tls-server-end-point,,n=,r=t8JwklwKecDLwSsA72rHmVju",
),
];
for (cb, input) in cases {
let msg = ClientFirstMessage::parse(input).unwrap();
assert_eq!(msg.bare, "n=pepe,r=t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.username, "pepe");
assert_eq!(msg.bare, "n=,r=t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.nonce, "t8JwklwKecDLwSsA72rHmVju");
assert_eq!(msg.cbind_flag, cb);
}
@@ -208,14 +212,13 @@ mod tests {
#[test]
fn parse_client_first_message_with_invalid_gs2_authz() {
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none())
}
#[test]
fn parse_client_first_message_with_extra_params() {
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
assert_eq!(msg.username, "user");
let msg = ClientFirstMessage::parse("n,,n=,r=nonce,a=foo,b=bar,c=baz").unwrap();
assert_eq!(msg.bare, "n=,r=nonce,a=foo,b=bar,c=baz");
assert_eq!(msg.nonce, "nonce");
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
}
@@ -223,9 +226,9 @@ mod tests {
#[test]
fn parse_client_first_message_with_extra_params_invalid() {
// must be of the form `<ascii letter>=<...>`
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,abc=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,1=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,a").is_none());
}
#[test]

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.78.0"
channel = "1.79.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -2409,11 +2409,17 @@ impl Service {
(detach_waiters, shard_ids, node.clone())
};
if let Err(e) = self.await_waiters(detach_waiters, RECONCILE_TIMEOUT).await {
// Failing to detach shouldn't hold up deletion, e.g. if a node is offline we should be able
// to use some other node to run the remote deletion.
tracing::warn!("Failed to detach some locations: {e}");
}
// This reconcile wait can fail in a few ways:
// A there is a very long queue for the reconciler semaphore
// B some pageserver is failing to handle a detach promptly
// C some pageserver goes offline right at the moment we send it a request.
//
// A and C are transient: the semaphore will eventually become available, and once a node is marked offline
// the next attempt to reconcile will silently skip detaches for an offline node and succeed. If B happens,
// it's a bug, and needs resolving at the pageserver level (we shouldn't just leave attachments behind while
// deleting the underlying data).
self.await_waiters(detach_waiters, RECONCILE_TIMEOUT)
.await?;
let locations = shard_ids
.into_iter()
@@ -2431,13 +2437,11 @@ impl Service {
for result in results {
match result {
Ok(StatusCode::ACCEPTED) => {
// This could happen if we failed detach above, and hit a pageserver where the tenant
// is still attached: it will accept the deletion in the background
tracing::warn!(
"Unexpectedly still attached on {}, client should retry",
// This should never happen: we waited for detaches to finish above
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Unexpectedly still attached on {}",
node
);
return Ok(StatusCode::ACCEPTED);
)));
}
Ok(_) => {}
Err(mgmt_api::Error::Cancelled) => {

View File

@@ -94,8 +94,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*WARN.*path=/v1/utilization .*request was dropped before completing",
# Can happen during shutdown
".*scheduling deletion on drop failed: queue is in state Stopped.*",
# Can happen during shutdown
".*ignoring failure to find gc cutoffs: timeline shutting down.*",
)

View File

@@ -0,0 +1,15 @@
DROP TABLE IF EXISTS halfvec_test_table;
CREATE TABLE halfvec_test_table (
_id text NOT NULL,
title text,
text text,
embeddings halfvec(1536),
PRIMARY KEY (_id)
);
INSERT INTO halfvec_test_table (_id, title, text, embeddings)
SELECT _id, title, text, embeddings::halfvec
FROM documents;
CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);

View File

@@ -0,0 +1,13 @@
-- run with pooled connection
-- pgbench -T 300 -c 100 -j20 -f pgbench_halfvec_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
with x (x) as (
select "embeddings" as x
from halfvec_test_table
TABLESAMPLE SYSTEM (1)
LIMIT 1
)
SELECT title, "embeddings" <=> (select x from x) as distance
FROM halfvec_test_table
ORDER BY 2
LIMIT 30;

View File

@@ -1,13 +0,0 @@
-- run with pooled connection
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
with x (x) as (
select "embeddings" as x
from hnsw_test_table
TABLESAMPLE SYSTEM (1)
LIMIT 1
)
SELECT title, "embeddings" <=> (select x from x) as distance
FROM hnsw_test_table
ORDER BY 2
LIMIT 30;

View File

@@ -106,6 +106,7 @@ QUERIES: Tuple[LabelledQuery, ...] = (
# Disable auto formatting for the list of queries so that it's easier to read
# fmt: off
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
LabelledQuery("PGVPREP", r"ALTER EXTENSION VECTOR UPDATE;"),
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
@@ -115,6 +116,10 @@ PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
LabelledQuery("PGV9", r"DROP TABLE IF EXISTS halfvec_test_table;"),
LabelledQuery("PGV10", r"CREATE TABLE halfvec_test_table (_id text NOT NULL, title text, text text, embeddings halfvec(1536), PRIMARY KEY (_id));"),
LabelledQuery("PGV11", r"INSERT INTO halfvec_test_table (_id, title, text, embeddings) SELECT _id, title, text, embeddings::halfvec FROM documents;"),
LabelledQuery("PGV12", r"CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);"),
)
# fmt: on

View File

@@ -18,6 +18,7 @@ class PgBenchLoadType(enum.Enum):
SIMPLE_UPDATE = "simple-update"
SELECT_ONLY = "select-only"
PGVECTOR_HNSW = "pgvector-hnsw"
PGVECTOR_HALFVEC = "pgvector-halfvec"
def utc_now_timestamp() -> int:
@@ -153,6 +154,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
password=password,
)
if workload_type == PgBenchLoadType.PGVECTOR_HALFVEC:
# Run simple-update workload
run_pgbench(
env,
"pgvector-halfvec",
[
"pgbench",
"-f",
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql",
"-c100",
"-j20",
f"-T{duration}",
"-P2",
"--protocol=prepared",
"--progress-timestamp",
connstr,
],
password=password,
)
env.report_size()
@@ -222,13 +243,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
@pytest.mark.remote_cluster
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
# The following test runs on an existing database that has pgvector extension installed
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
#
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)

View File

@@ -0,0 +1,24 @@
import pytest
from fixtures.compare_fixtures import PgCompare
from performance.test_perf_pgbench import PgBenchLoadType, get_durations_matrix, run_test_pgbench
# The following test runs on an existing database that has pgvector extension installed
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
#
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_pgvector_hnsw(remote_compare: PgCompare, duration: int):
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
# The following test runs on an existing database that has pgvector extension installed
# and a table with 1 million embedding vectors loaded and indexed with halfvec.
#
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_pgvector_halfvec(remote_compare: PgCompare, duration: int):
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HALFVEC)

View File

@@ -300,7 +300,7 @@ def test_replica_query_race(neon_simple_env: NeonEnv):
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
time.sleep(1)
wait_replica_caughtup(primary_ep, standby_ep)
# In primary, run a lot of UPDATEs on a single page
finished = False

View File

@@ -129,3 +129,33 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
cur_replica = conn_replica.cursor()
cur_replica.execute("SELECT * FROM clogtest")
assert cur_replica.fetchall() == [(1,), (3,)]
def test_ondemand_download_after_wal_switch(neon_env_builder: NeonEnvBuilder):
"""
Test on-demand SLRU download on standby, when starting right after
WAL segment switch.
This is a repro for a bug in how the LSN at WAL page/segment
boundary was handled (https://github.com/neondatabase/neon/issues/8030)
"""
tenant_conf = {
"lazy_slru_download": "true",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Create a test table
cur.execute("CREATE TABLE clogtest (id integer)")
cur.execute("INSERT INTO clogtest VALUES (1)")
# Start standby at WAL segment boundary
cur.execute("SELECT pg_switch_wal()")
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
_endpoint_at_lsn = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
)

View File

@@ -133,6 +133,9 @@ def test_storage_controller_smoke(
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
# Let all the reconciliations after marking the node offline complete
env.storage_controller.reconcile_until_idle()
# Marking pageserver active should not migrate anything to it
# immediately
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"})

View File

@@ -678,10 +678,6 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder):
with pytest.raises(PageserverApiException, match=matcher):
completion.result()
# this happens on both cases
env.pageserver.allowed_errors.append(
".*ignoring failure to find gc cutoffs: timeline shutting down.*"
)
# this happens only in the case of deletion (http response logging)
env.pageserver.allowed_errors.append(".*Failed to refresh gc_info before gathering inputs.*")

View File

@@ -287,13 +287,6 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
# already truncated away.
#
# ERROR: could not access status of transaction 1027
# Debugging https://github.com/neondatabase/neon/issues/6967
# the select() below fails occassionally at get_impl="vectored" validation
env.pageserver.http_client().patch_tenant_config_client_side(
tenant_id,
{"test_vm_bit_debug_logging": True},
)
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
tup = cur.fetchall()
log.info(f"tuple = {tup}")

View File

@@ -601,13 +601,16 @@ async def run_segment_init_failure(env: NeonEnv):
conn = await ep.connect_async()
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
# next insertion should hang until failpoint is disabled.
asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'"))
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1,1), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# also restart ep at segment boundary to make test more interesting
ep.stop()
# it must still be not finished
# assert not bg_query.done()
assert not bg_query.done()
# Also restart ep at segment boundary to make test more interesting. Do it in immediate mode;
# fast will hang because it will try to gracefully finish sending WAL.
ep.stop(mode="immediate")
# Without segment rename during init (#6402) previous statement created
# partially initialized 16MB segment, so sk restart also triggers #6401.
sk.stop().start()

View File

@@ -18,7 +18,7 @@ commands:
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
- name: sql-exporter
user: nobody
sysvInitAction: respawn
@@ -93,7 +93,7 @@ files:
target:
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
# the schema gets dropped or replaced to match the driver expected DSN format.
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter'
# Collectors (referenced by name) to execute on the target.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
@@ -128,7 +128,7 @@ files:
target:
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
# the schema gets dropped or replaced to match the driver expected DSN format.
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling'
# Collectors (referenced by name) to execute on the target.
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).