Compare commits

..

57 Commits

Author SHA1 Message Date
Anastasia Lubennikova
c755aafe76 Restart already running rsyslog to update configuration 2025-03-05 16:55:57 +00:00
Anastasia Lubennikova
43afb9e2f4 spawn rsyslog from sysvInit 2025-03-05 16:50:31 +00:00
Anastasia Lubennikova
8d7fde3b07 Fix start rsyslog code to run for read endpoints too 2025-03-05 15:13:37 +00:00
Anastasia Lubennikova
7327e65af7 exclud pg_catalog schema from audit logging 2025-03-05 12:56:32 +00:00
Anastasia Lubennikova
56efce2249 When the ComputeAuditLogLevel is
set to 'Hipaa':

- setup and configure
pgaudit and pgauditlogtofile extensions
in compute_ctl.

- spin up a rsyslog server in the compute VM,
and configure it to send logs to the endpoint
specified in AUDIT_LOGGING_ENDPOINT env.

Change pgaudit.log default to log 'all'.
exclude postgres database from audit logging:
we consider it system database that doesn't contain any sensitive data.

- add pgaudit, pgauditlogtofile to shared_preload_libraries
if audit_log_level Hipaa is enabled

Move rsyslog config to compute_rsyslog_template.conf

Set pgaudit.log_rotation_age
2025-03-05 12:27:39 +00:00
Vlad Lazar
abae7637d6 pageserver: do big reads to fetch slru segment (#11029)
## Problem

Each page of the slru segment is fetched individually when it's loaded
on demand.

## Summary of Changes

Use `Timeline::get_vectored` to fetch 16 at a time.
2025-03-05 11:55:55 +00:00
Anastasia Lubennikova
38a883118a Skip dropping tablesync replication slots on the publisher from branch (#11073)
fixes https://github.com/neondatabase/cloud/issues/24292

Do not drop tablesync replication slots on the publisher, 
when we're in the process of dropping subscriptions inherited by a neon
branch.
Because these slots are still needed by the parent branch subscriptions.

For regular slots we handle this by setting the slot_name to NONE
before calling DROP SUBSCRIPTION, but tablesync slots are not exposed to
SQL.

rely on GUC disable_logical_replication_subscribers=true 
to know that we're in the Neon-specific process of dropping
subscriptions.
2025-03-05 11:29:46 +00:00
Erik Grinaker
40aa4d7151 utils: log Sentry initialization (#11077)
## Problem

We don't have any logging for Sentry initialization. This makes it hard
to verify that it has been configured correctly.

## Summary of changes

Log some basic info when Sentry has been initialized, but omit the
public key (which allows submitting events). Also log when `SENTRY_DSN`
isn't specified at all, and when it fails to initialize (which is
supposed to panic, but we may as well).
2025-03-05 11:23:07 +00:00
Folke Behrens
8e51bfc597 proxy: JSON logging field refactor (#11078)
## Problem

Grafana Loki's JSON handling is somewhat limited and the log message
should be structured in a way that it's easy to sift through logs and
filter.

## Summary of changes

* Drop span_id. It's too short lived to be of value and only bloats the
logs.
* Use the span's name as the object key, but append a unique numeric
value to prevent name collisions.
* Extract interesting span fields into a separate object at the root.

New format:
```json
{
  "timestamp": "2025-03-04T18:54:44.134435Z",
  "level": "INFO",
  "message": "connected to compute node at 127.0.0.1 (127.0.0.1:5432) latency=client: 22.002292ms, cplane: 0ns, compute: 5.338875ms, retry: 0ns",
  "fields": {
    "cold_start_info": "unknown"
  },
  "process_id": 56675,
  "thread_id": 9122892,
  "task_id": "24",
  "target": "proxy::compute",
  "src": "proxy/src/compute.rs:288",
  "trace_id": "5eb89b840ec63fee5fc56cebd633e197",
  "spans": {
    "connect_request#1": {
      "ep": "endpoint",
      "role": "proxy",
      "session_id": "b8a41818-12bd-4c3f-8ef0-9a942cc99514",
      "protocol": "tcp",
      "conn_info": "127.0.0.1"
    },
    "connect_to_compute#6": {},
    "connect_once#8": {
      "compute_id": "compute",
      "pid": "853"
    }
  },
  "extract": {
    "session_id": "b8a41818-12bd-4c3f-8ef0-9a942cc99514"
  }
}
```
2025-03-05 10:27:46 +00:00
Peter Bendel
906d7468cc exclude separate perf tests from bench step (#11084)
## Problem

Our benchmarking workflow has a job step `bench`which runs all tests in
test_runner/performance/* except those that we want to run separately.
We recently added two test cases to that testcase directory that we want
to run separately but forgot to ignore them during the bench step. This
is now causing
[failures](https://github.com/neondatabase/neon/actions/runs/13667689340/job/38212087331#step:7:392).

## Summary of changes

Ignore the separately run tests in the bench step.
2025-03-05 10:14:51 +00:00
Konstantin Knizhnik
438f7bb726 Check response status in prefetch_lookup (#11080)
## Problem

New async prefetch introduces `prefetch+lookup[` function which is
called before LFC lookup to check if prefetch request is already
completed.

This function is not containing now check that response is actually
`T_NeonGetPageResponse` (and not error).

## Summary of changes

Add checks for response tag.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-05 10:03:09 +00:00
Peter Bendel
f62ddb11ed Distinguish manually submitted runs for periodic pagebench in grafana dashboard (#11079)
## Problem

Periodic pagebench workflow runs periodically from latest main commit
and also allows to dispatch it manually for a given commit hash to
bi-sect regressions.
However in the dashboards we can not distinguish manual runs from
periodic runs which makes it harder to follow the trend.

## Summary of changes

Send an additional flag commit type to the benchmark runner instance to
distinguish the run type.

Note: this needs a follow-up PR on the receiving side.
2025-03-04 18:11:43 +00:00
Tristan Partin
7b7e4a9fd3 Authorize compute_ctl requests from the control plane (#10530)
The compute should only act if requests come from the control plane.

Signed-off-by: Tristan Partin <tristan@neon.tech>

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-03-04 18:08:00 +00:00
Erik Grinaker
4bbdb758ec compute_tools: appease unused lint on macOS (#11074)
## Problem

On macOS, the `unused` lint complains about two variables not used in
`!linux` builds.

These were introduced in #11007.

## Summary of changes

Appease the linter by explicitly using the variables in `!linux`
branches.
2025-03-04 16:39:32 +00:00
Alex Chi Z.
20af9cef17 fix(test): use the same value for reldir v1+v2 (#11070)
## Problem

part of https://github.com/neondatabase/neon/issues/11067

My observation is that with the current value of settings, x86-v1
usually takes 30s, arm-v1 1m30s, x86-v2 1m, arm-v2 3m. But sometimes the
system could run too slow and cause test to timeout on arm with reldir
v2.

While I investigate what's going on and further improve the performance,
I'd like to set both of them to use the same test input, so that it
doesn't timeout and we don't abuse this test case as a performance test.

## Summary of changes

Use the same settings for both test cases.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-04 14:55:50 +00:00
Erik Grinaker
a2902e774a http-utils: generate heap profiles with jemalloc_pprof (#11075)
## Problem

The code to generate symbolized pprof heap profiles and flamegraph SVGs
has been upstreamed to the `jemalloc_pprof` crate:

* https://github.com/polarsignals/rust-jemalloc-pprof/pull/22
* https://github.com/polarsignals/rust-jemalloc-pprof/pull/23

## Summary of changes

Use `jemalloc_pprof` to generate symbolized pprof heap profiles and
flamegraph SVGs.

This reintroduces a bunch of internal jemalloc stack frames that we'd
previously strip, e.g. each stack now always ends with
`prof_backtrace_impl` (where jemalloc takes a stack trace for heap
profiling), but that seems ok.
2025-03-04 12:13:41 +00:00
Vlad Lazar
435bf452e6 tests: remove obsolete err log whitelisting (#11069)
The pageserver read path now supports overlapped in-memory and image
layers via
https://github.com/neondatabase/neon/pull/11000. These allowed errors
are now obsolete.
2025-03-04 08:18:19 +00:00
Erik Grinaker
65addfc524 storcon: add per-tenant rate limiting for API requests (#10924)
## Problem

Incoming requests often take the service lock, and sometimes even do
database transactions. That creates a risk that a rogue client can
starve the controller of the ability to do its primary job of
reconciling tenants to an available state.

## Summary of changes

* Use the `governor` crate to rate limit tenant requests at 10 requests
per second. This is ~10-100x lower than the worst "attack" we've seen
from a client bug. Admin APIs are not rate limited.
* Add a `storage_controller_http_request_rate_limited` histogram for
rate limited requests.
* Log a warning every 10 seconds for rate limited tenants.

The rate limiter is parametrized on TenantId, because the kinds of
client bug we're protecting against generally happen within tenant
scope, and the rates should be somewhat stable: we expect the global
rate of requests to increase as we do more work, but we do not expect
the rate of requests to one tenant to increase.

---------

Co-authored-by: John Spray <john@neon.tech>
2025-03-03 22:04:59 +00:00
Alex Chi Z.
6d0976dad5 feat(pageserver): persist reldir v2 migration status (#10980)
## Problem

part of https://github.com/neondatabase/neon/issues/9516

## Summary of changes

Similar to the aux v2 migration, we persist the relv2 migration status
into index_part, so that even the config item is set to false, we will
still read from the v2 storage to avoid loss of data.

Note that only the two variants `None` and
`Some(RelSizeMigration::Migrating)` are used for now. We don't have full
migration implemented so it will never be set to
`RelSizeMigration::Migrated`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-03 21:05:43 +00:00
Alex Chi Z.
dbf9a80261 fix(pageserver): avoid flooding gc-compaction logs (#11024)
## Problem

The "did not trigger" gets logged at 10k/minute in staging.

## Summary of changes

Change it to debug level.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-03-03 20:23:20 +00:00
Vlad Lazar
6ca49b4d0c safekeeper: fix a gap tracking edge case (#11054)
The interpreted reader tracks a record aligned current position in the
WAL stream.
Partial reads move the stream internally, but not from the pov of the
interpreted WAL reader.

Hence, where new shards subscribe with a start position that matches the
reader's current position,
but we've also done some partial reads. This confuses the gap tracking.
To make it more robust,
update the current batch start to the min between the new start position
and its current value.

Since no record has been decoded yet (position matches), we can't have
lost it
2025-03-03 19:16:03 +00:00
Vlad Lazar
5197e43396 pageserver: add recurse flag to layer download spec (#11068)
I missed updating the open api spec in the original PR.
We need this so that the cplane auto-generated client sees the flag.
2025-03-03 19:04:01 +00:00
Alexander Lakhin
9a4e2eab61 Fix artifact name for build with sanitizers (#11066)
## Problem
When a build is made with sanitizers, this is not reflected in the
artifact name, which can lead to overriding normal builds with sanitized
ones.

## Summary of changes
Take this property of a build into account when constructing the
artifact name.
2025-03-03 18:00:53 +00:00
Vlad Lazar
8298bc903c pageserver: handle in-memory layer overlaps with persistent layers (#11000)
## Problem

Image layers may be nested inside in-memory layers as diagnosed
[here](https://github.com/neondatabase/neon/issues/10720#issuecomment-2649419252).
The read path doesn't support this and may skip over the image layer,
resulting in a failure to reconstruct the page.

## Summary of changes

We already support nesting of image layers inside delta layers. The
logic lives in `LayerMap::select_layer`.
The main goal of this PR is to propagate the candidate in-memory layer
down to that point and update
the selection logic.

Important changes are:
1. Support partial reads for the in-memory layer. Previously, we could
only specify the start LSN of the read.
We need to control the end LSN too.
2. `LayerMap::ranged_search` considers in-memory layers too. Previously,
the search for in-memory layers
was done explicitly in `Timeline::get_reconstruct_data_timeline`. Note
that `LayerMap::ranged_search` now returns
a weak readable layer which the `LayerManager` can upgrade. This dance
is such that we can unit test the layer selection logic.
3. Update `LayerMap::select_layer` to consider the candidate in-memory
layer too

Loosely related drive bys:
1. Remove the "keys not found" tracking in the ranged search. This
wasn't used anywhere and it just complicates things.
2. Remove the difficulty map stuff from the layer map. Again, not used
anywhere.

Closes https://github.com/neondatabase/neon/issues/9185
Closes https://github.com/neondatabase/neon/issues/10720
2025-03-03 17:52:59 +00:00
John Spray
b953daa21f safekeeper: allow remote deletion to proceed after dropped requests (#11042)
## Problem

If a caller times out on safekeeper timeline deletion on a large
timeline, and waits a while before retrying, the deletion will not
progress while the retry is waiting. The net effect is very very slow
deletion as it only proceeds in 30 second bursts across 5 minute idle
periods.

Related: https://github.com/neondatabase/neon/issues/10265

## Summary of changes

- Run remote deletion in a background task
- Carry a watch::Receiver on the Timeline for other callers to join the
wait
- Restart deletion if the API is called again and the previous attempt
failed
2025-03-03 16:03:51 +00:00
Peter Bendel
a07599949f First version of a new benchmark to test larger OLTP workload (#11053)
## Problem

We want to support larger tenants (regarding logical database size,
number of transactions per second etc.) and should increase our test
coverage of OLTP transactions at larger scale.

## Summary of changes

Start a new benchmark that over time will add more OLTP tests at larger
scale.
This PR covers the first version and will be extended in further PRs.

Also fix some infrastructure:
- default for new connections and large tenants is to use connection
pooler pgbouncer, however our fixture always added
`statement_timeout=120` which is not compatible with pooler
[see](https://neon.tech/docs/connect/connection-errors#unsupported-startup-parameter)
- action to create branch timed out after 10 seconds and 10 retries but
for large tenants it can take longer so use increasing back-off for
retries

## Test run

https://github.com/neondatabase/neon/actions/runs/13593446706
2025-03-03 15:25:48 +00:00
Vlad Lazar
38277497fd pageserver: log shutdown at info level for basebackup (#11046)
## Problem

Timeline shutdown during basebackup logs at error level because the the
canecellation error is smushed into BasebackupError::Server.

## Summary of changes
Introduce BasebackupError::Shutdown and use it. `log_query_error` will
now see `QueryError::Shutdown` and log at info level.
2025-03-03 13:46:50 +00:00
Arseny Sher
ef2b50994c walproposer: basic infra to enable generations (#11002)
## Problem

Preparation for https://github.com/neondatabase/neon/issues/10851

## Summary of changes

Add walproposer `safekeepers_generations` field which can be set by
prefixing `neon.safekeepers` GUC with `g#n:`. Non zero value (n) forces
walproposer to use generations. In particular, this also disables
implicit timeline creation as timeline will be created by storcon. Add
test checking this. Also add missing infra: `--safekeepers-generation`
flag to neon_local endpoint start + fix `--start-timeout` flag: it
existed but value wasn't used.
2025-03-03 13:20:20 +00:00
Konstantin Knizhnik
8669bfe493 Do not store zero pages in inmem SMGR for walredo (#11043)
## Problem


See https://neondb.slack.com/archives/C033RQ5SPDH/p1740157873114339

smgrextend for FSM fork is called during page reconstruction by walredo
process causing overflow of inmem SMGR (64 pages).

## Summary of changes

Do not store zero pages in inmem SMGR because `inmem_read` returns zero
page if it is not able to locate specified block.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-03-03 12:50:07 +00:00
Misha Sakhnov
625c526bdd ci: create multiarch vm images (#11017)
## Problem

We build compute-nodes as multi-arch images, but not the
vm-compute-nodes. The PR adds multiarch vm images the same way as in
autoscaling repo.

## Summary of changes

Add architecture to the matrix for vm compute build steps
Add merge job

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-03-03 11:47:09 +00:00
a-masterov
df0767176a Change the tags names according to the curent state (#11059)
## Problem
We have not synced `force-test-extensions-upgrade.yml` with the last
changes.
The variable `TEST_EXTENSIONS_UPGRADE` was ignored in the script and
actually set to `NEW_COMPUTE_TAG` while it should be set to
`OLD_COMPUTE_TAG` as we are about to run compatibility tests.
## Summary of changes
The tag names were synced, the logic was fixed.
2025-03-03 09:40:49 +00:00
Heikki Linnakangas
38ddfab643 compute_ctl: Perform more startup actions in parallel (#11008)
To speed up compute startup. Resizing swap in particular takes about 100
ms on my laptop. By performing it in parallel with downloading the
basebackup, that latency is effectively hidden. I would imagine that
downloading remote extensions can also take a non-trivial amount of
time, although I didn't try to measure that. In any case that's now also
performed in parallel with downloading the basebackup.
2025-03-03 00:29:37 +00:00
Heikki Linnakangas
066324d6ec compute_ctl: Rearrange startup code (#11007)
Move most of the code to compute.rs, so that all the major startup steps
are visible in one place. You can now get a pretty good picture of what
happens in the latency-critical path at compute startup by reading
ComputeNode::start_compute().

This also clarifies the error handling in start_compute. Previously, the
start_postgres function sometimes returned an Err, and sometimes Ok but
with the compute status already set to Failed. Now the start_compute
function always returns Err on failure, and it's the caller's
responsibility to change the compute status to Failed. Separately from
that, it returns a handle to the Postgres process via a `&mut` reference
if it had already started Postgres (i.e. on success, or if the failure
happens after launching the Postgres process).

---------

Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
2025-02-28 22:48:05 +00:00
Suhas Thalanki
ee0c8ca8fd Add -fsigned-char for cross platform signed chars (#10852)
## Problem

In multi-character keys, the GIN index creates a CRC Hash of the first 3
bytes of the key.
The hash can have the first bit to be set or unset, needing to have a
consistent representation
of `char` across architectures for consistent results. GIN stores these
keys by their hashes
which determines the order in which the keys are obtained from the GIN
index.

By default, chars are signed in x86 and unsigned in arm, leading to
inconsistent behavior across different platform architectures. Adding
the `-fsigned-char` flag to the GCC compiler forces chars to be treated
as signed across platforms, ensuring the ordering in which the keys are
obtained consistent.

## Summary of changes

Added `-fsigned-char` to the `CFLAGS` to force GCC to use signed chars
across platforms.
Added a test to check this across platforms.

Fixes: https://github.com/neondatabase/cloud/issues/23199
2025-02-28 21:07:21 +00:00
Ivan Efremov
56033189c1 feat(proxy): Log latency after connect to compute (#11048)
## Problem
To measure latency accurate we should associate the testodrome role
within a latency data

## Summary of changes
Add latency logging to associate different roles within a latency.

Relates to the #22486
2025-02-28 17:58:42 +00:00
Erik Grinaker
d857f63e3b pageserver: fix race that can wedge background tasks (#11047)
## Problem

`wait_for_active_tenant()`, used when starting background tasks, has a
race condition that can cause it to wait forever (until cancelled). It
first checks the current tenant state, and then subscribes for state
updates, but if the state changes between these then it won't be
notified about it.

We've seen this wedge compaction tasks, which can cause unbounded layer
file buildup and read amplification.

## Summary of changes

Use `watch::Receiver::wait_for()` to check both the current and new
tenant states.
2025-02-28 17:00:22 +00:00
Alex Chi Z.
f79ee0bb88 fix(storcon): loop in chaos injection (#11004)
## Problem

Somehow the previous patch loses the loop in the chaos injector function
so everything will only run once.
https://github.com/neondatabase/neon/pull/10934

## Summary of changes

Add back the loop.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-28 15:49:15 +00:00
Vlad Lazar
23fb8053c5 storcon: soft disable SK heartbeats (#11041)
## Problem

JWT tokens aren't in place, so all SK heartbeats fail. This is
equivalent to a wait before applying the PS heartbeats and makes things
more flaky.

## Summary of Changes

Add a flag that skips loading SKs from the db on start-up and at
runtime.
2025-02-28 15:49:09 +00:00
Conrad Ludgate
d9ced89ec0 feat(proxy): require TLS to compute if prompted by cplane (#10717)
https://github.com/neondatabase/cloud/issues/23008

For TLS between proxy and compute, we are using an internally
provisioned CA to sign the compute certificates. This change ensures
that proxy will load them from a supplied env var pointing to the
correct file - this file and env var will be configured later, using a
kubernetes secret.

Control plane responds with a `server_name` field if and only if the
compute uses TLS. This server name is the name we use to validate the
certificate. Control plane still sends us the IP to connect to as well
(to support overlay IP).

To support this change, I'd had to split `host` and `host_addr` into
separate fields. Using `host_addr` and bypassing `lookup_addr` if
possible (which is what happens in production). `host` then is only used
for the TLS connection.

There's no blocker to merging this. The code paths will not be triggered
until the new control plane is deployed and the `enableTLS` compute flag
is enabled on a project.
2025-02-28 14:20:25 +00:00
Erik Grinaker
c7ff3c4c9b safekeeper: downgrade interpreted reader errors (#11034)
## Problem

This `critical!` could fire on IO errors, which is just noisy.

Resolves #11027.

## Summary of changes

Downgrade to error, except for decode errors. These could be either data
corruption or a bug, but seem worth investigating either way.
2025-02-28 14:06:56 +00:00
Christian Schwarz
7c53fd0d56 refactor(page_service / timeline::handle): the GateGuard need not be a special case (#11030)
# Changes

While working on
- https://github.com/neondatabase/neon/pull/7202

I found myself needing to cache another expensive Arc::clone inside
inside the timeline::handle::Cache by wrapping it in another Arc.

Before this PR, it seemed like the only expensive thing we were caching
was the
connection handler tasks' clone of `Arc<Timeline>`.

But in fact the GateGuard was another such thing, but it was
special-cased
in the implementation.

So, this refactoring PR de-special-cases the GateGuard.

# Performance

With this PR we are doing strictly _less_ operations per `Cache::get`.
The reason is that we wrap the entire `Types::Timeline` into one Arc.
Before this PR, it was a separate Arc around the Arc<Timeline> and
one around the Arc<GateGuard>.

With this PR, we avoid an allocation per cached item, namely,
the separate Arc around the GateGuard.

This PR does not change the amount of shared mutable state.

So, all in all, it should be a net positive, albeit probably not
noticable
with our small non-NUMA instances and generally high CPU usage per
request.

# Reviewing

To understand the refactoring logistics, look at the changes to the unit
test types first.
Then read the improved module doc comment.
Then the remaining changes.

In the future, we could rename things to be even more generic.
For example, `Types::TenantMgr` could really be a `Types::Resolver`.
And `Types::Timeline` should, to avoid constant confusion in the doc
comment,
be called `Types::Cached` or `Types::Resolved`.
Because the `handle` module, after this PR, really doesn't care that
we're
using it for storing Arc's and GateGuards.

Then again, specicifity is sometimes more useful than being generic.
And writing the module doc comment in a totally generic way would
probably also be more confusing than helpful.
2025-02-28 12:31:52 +00:00
a-masterov
7607686f25 Make test extensions upgrade work with absent images (#11036)
## Problem
CI does not pass for the compute release due to the absence of some
images

## Summary of changes
Now we use the images from the old non-compute releases for non-compute
images
2025-02-28 11:16:22 +00:00
Vlad Lazar
0d6d58bd3e pageserver: make heatmap layer download API more cplane friendly (#10957)
## Problem

We intend for cplane to use the heatmap layer download API to warm up
timelines after unarchival. It's tricky for them to recurse in the
ancestors,
and the current implementation doesn't work well when unarchiving a
chain
of branches and warming them up.

## Summary of changes

* Add a `recurse` flag to the API. When the flag is set, the operation
recurses into the parent
timeline after the current one is done.
* Be resilient to warming up a chain of unarchived branches. Let's say
we unarchived `B` and `C` from
the `A -> B -> C` branch hierarchy. `B` got unarchived first. We
generated the unarchival heatmaps
and stash them in `A` and `B`. When `C` unarchived, it dropped it's
unarchival heatmap since `A` and `B`
already had one. If `C` needed layers from `A` and `B`, it was out of
luck. Now, when choosing whether
to keep an unarchival heatmap we look at its end LSN. If it's more
inclusive than what we currently have,
keep it.
2025-02-28 10:36:53 +00:00
John Spray
55633ebe3a storcon: enable API passthrough to nonzero shards (#11026)
## Problem

Storage controller will proxy GETs to pageserver-like tenant/timeline
paths through to the pageserver.

Usually GET passthroughs make sense to go to shard 0, e.g. if you want
to list timelines.

But sometimes you really want to know about a particular shard, e.g.
reading its cache state or similar.

## Summary of changes

- Accept shard IDs as well as tenant IDs in the passthrough route
- Refactor node lookup to take a shard ID and make the tenant ID case a
layer on top of that. This is one more lock take-drop during these
requests, but it's not particularly expensive and these requests
shouldn't be terribly frequent

This is not immediately used by anything, but will be there any time we
want to e.g. do a pass-through query to check the warmth of a tenant
cache on a particular shard or somesuch.
2025-02-28 08:42:08 +00:00
Heikki Linnakangas
a4b2009800 compute_ctl: Refactor, moving spec_apply functions to spec_apply.rs (#11006)
Seems nice to have the function and all its subroutines in the same
source file.
2025-02-27 20:13:06 +00:00
Alex Chi Z.
ab1f22b7d1 fix(pageserver): correctly access layer map in gc-compaction (#11021)
## Problem

layer_map access was unwrapped. It might return an error during
shutdown.

## Summary of changes

Propagate the layer_map access error back to the compaction loop.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-27 16:26:55 +00:00
JC Grünhage
7ed236e17e fix(ci): push prod container images again (#11020)
## Problem
https://github.com/neondatabase/neon/pull/10841 made building compute
and neon images optional on releases that don't need them. The
`push-<component>-image-prod` jobs had transitive dependencies that were
skipped due to that, causing the images not to be pushed to production
registries.

## Summary of changes

Add `!failure() && !cancelled() &&` to the beginning of the conditions
for these jobs to ensure they run even if some of their transitive
dependencies are skipped.
2025-02-27 16:16:14 +00:00
Konstantin Knizhnik
e58f264a05 Increase inmem SMGR size for walredo process to 100 pagees (#10937)
## Problem

We see `Inmem storage overflow` in page server logs:

https://neondb.slack.com/archives/C033RQ5SPDH/p1740157873114339

walked process is using inseam SMGR with storage size limited by 64
pages with warning watermark 32 (based ion the assumption that
XLR_MAX_BLOCK_ID is 32, so WAL record can not access more than 32
pages).

Actually it is not true. We can update up to 3 forks for each block
(including update of FSM and VM forks).

## Summary of changes

This PR increases inseam SMGR size for walled process to 100 pages and
print stack trace in case of overflow.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-02-27 14:31:05 +00:00
Matthias van de Meent
a283edaccf PS/Prefetch: Use a timeout for reading data from TCP (#10834)
This reduces pressure on OS TCP buffers, reducing flush times in other
systems like PageServer.

## Problem

## Summary of changes
2025-02-27 14:00:18 +00:00
a-masterov
ad37199745 Separate the upgrade tests in timelines (#10974)
## Problem
We created extensions in a single database. The tests could interfere,
i.e., discover some service tables left by other extensions and produce
unexpected results.
## Summary of changes
The tests are now run in a separate timeline, so only one extension owns
the database, which prevents interference.
2025-02-27 13:45:18 +00:00
Erik Grinaker
93b59e65a2 pageserver: remove stale comment (#11016)
No longer true now that we eagerly notify the compaction loop.
2025-02-27 12:56:28 +00:00
Christian Schwarz
e35f7758d8 impr(controller_upcall_client): clean up copy-pasta code & add context to retries (#10991)
Before this PR, re-attach and validate would log the same warning
```
calling control plane generation validation API failed
```
on retry errors.

This can be confusing.

This PR makes the message generically valid for any upcall and adds
additional tracing spans to capture context.

Along the way, clean up some copy-pasta variable naming.

refs
-
https://github.com/neondatabase/neon/issues/10381#issuecomment-2684755827

---------

Co-authored-by: Alexander Lakhin <alexander.lakhin@neon.tech>
2025-02-27 10:59:43 +00:00
Peter Bendel
3a3d62dc4f Bodobolero/test cum stats persistence (#10995)
## Problem

So far cumulative statistics have not been persisted when Neon scales to
zero (suspends endpoint).
With PR https://github.com/neondatabase/neon/pull/6560 the cumulative
statistics should now survive endpoint restarts and correctly trigger
the auto- vacuum and auto analyze maintenance

So far we did not have a testcase that validates that improvement in our
dev cloud environment with a real project.

## Summary of changes

Introduce testcase `test_cumulative_statistics_persistence`in the
benchmarking workflow running daily to verify:

- Verifies that the cumulative statistics are correctly persisted across
restarts.
- Cumulative statistics are important to persist across restarts because
they are used
-  when auto-vacuum an auto-analyze trigger conditions are met.
-  The test performs the following steps:
    - Seed a new project using pgbench
    - insert tuples that by itself are not enough to trigger auto-vacuum
    - suspend the endpoint
    - resume the endpoint
- insert additional tuples that by itself are not enough to trigger
auto-vacuum but in combination with the previous tuples are
- verify that autovacuum is triggered by the combination of tuples
inserted before and after endpoint suspension

## Test run


https://github.com/neondatabase/neon/actions/runs/13546879714/job/37860609089#step:6:282
2025-02-27 10:45:13 +00:00
Arpad Müller
a22be5af72 Migrate the last crates to edition 2024 (#10998)
Migrates the remaining crates to edition 2024. We like to stay on the
latest edition if possible. There is no functional changes, however some
code changes had to be done to accommodate the edition's breaking
changes.

Like the previous migration PRs, this is comprised of three commits:

* the first does the edition update and makes `cargo check`/`cargo
clippy` pass. we had to update bindgen to make its output [satisfy the
requirements of edition
2024](https://doc.rust-lang.org/edition-guide/rust-2024/unsafe-extern.html)
* the second commit does a `cargo fmt` for the new style edition.
* the third commit reorders imports as a one-off change. As before, it
is entirely optional.

Part of #10918
2025-02-27 09:40:40 +00:00
Christian Schwarz
f09843ef17 refactor(pageserver): propagate RequestContext to layer downloads (#11001)
For some reason the layer download API never fully got
`RequestContext`-infected.

This PR fixes that as a precursor to
- https://github.com/neondatabase/neon/issues/6107
2025-02-27 09:26:25 +00:00
JC Grünhage
c92a36740b fix(ci): support PR-on-top-of-PR usecase again (#11013)
## Problem
https://github.com/neondatabase/neon/pull/10841 broke CI on PRs that
aren't based on main or a release branch but want to merge into another
PR.

## Summary of changes
Replace `run-kind=pr-main` with `run-kind=pr`, so that all PRs that
aren't release PRs are treated equally.
2025-02-27 09:05:15 +00:00
Arseny Sher
8b86cd1154 safekeeper: follow membership configuration rules (#10781)
## Problem

safekeepers must ignore walproposer messages with non matching
membership conf.

## Summary of changes

Make safekeepers reject vote request, proposer elected and append
request messages with non matching generation. Switch to the
configuration in the greeting message if it is higher.

In passing, fix one comment and WAL truncation.

Last part of https://github.com/neondatabase/neon/issues/9965
2025-02-27 06:13:30 +00:00
250 changed files with 6760 additions and 3499 deletions

View File

@@ -32,3 +32,4 @@ config-variables:
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- AWS_ECR_REGION
- BENCHMARK_LARGE_OLTP_PROJECTID

View File

@@ -84,7 +84,13 @@ runs:
--header "Authorization: Bearer ${API_KEY}"
)
role_name=$(echo $roles | jq --raw-output '.roles[] | select(.protected == false) | .name')
role_name=$(echo "$roles" | jq --raw-output '
(.roles | map(select(.protected == false))) as $roles |
if any($roles[]; .name == "neondb_owner")
then "neondb_owner"
else $roles[0].name
end
')
echo "role_name=${role_name}" >> $GITHUB_OUTPUT
env:
API_HOST: ${{ inputs.api_host }}
@@ -107,13 +113,13 @@ runs:
)
if [ -z "${reset_password}" ]; then
sleep 1
sleep $i
continue
fi
password=$(echo $reset_password | jq --raw-output '.role.password')
if [ "${password}" == "null" ]; then
sleep 1
sleep $i # increasing backoff
continue
fi

View File

@@ -44,6 +44,11 @@ inputs:
description: 'Postgres version to use for tests'
required: false
default: 'v16'
sanitizers:
description: 'enabled or disabled'
required: false
default: 'disabled'
type: string
benchmark_durations:
description: 'benchmark durations JSON'
required: false
@@ -59,7 +64,7 @@ runs:
if: inputs.build_type != 'remote'
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build_type }}-artifact
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build_type }}${{ inputs.sanitizers == 'enabled' && '-sanitized' || '' }}-artifact
path: /tmp/neon
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
@@ -112,6 +117,7 @@ runs:
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
RERUN_FAILED: ${{ inputs.rerun_failed }}
PG_VERSION: ${{ inputs.pg_version }}
SANITIZERS: ${{ inputs.sanitizers }}
shell: bash -euxo pipefail {0}
run: |
# PLATFORM will be embedded in the perf test report

View File

@@ -280,7 +280,7 @@ jobs:
- name: Upload Neon artifact
uses: ./.github/actions/upload
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-artifact
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}${{ inputs.sanitizers == 'enabled' && '-sanitized' || '' }}-artifact
path: /tmp/neon
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
@@ -347,6 +347,7 @@ jobs:
real_s3_region: eu-central-1
rerun_failed: true
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
@@ -359,7 +360,6 @@ jobs:
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
SANITIZERS: ${{ inputs.sanitizers }}
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -19,7 +19,7 @@ on:
description: "Tag of the last compute release"
value: ${{ jobs.tags.outputs.compute }}
run-kind:
description: "The kind of run we're currently in. Will be one of `pr-main`, `push-main`, `storage-rc`, `storage-release`, `proxy-rc`, `proxy-release`, `compute-rc`, `compute-release` or `merge_queue`"
description: "The kind of run we're currently in. Will be one of `pr`, `push-main`, `storage-rc`, `storage-release`, `proxy-rc`, `proxy-release`, `compute-rc`, `compute-release` or `merge_queue`"
value: ${{ jobs.tags.outputs.run-kind }}
permissions: {}
@@ -51,10 +51,10 @@ jobs:
|| (inputs.github-event-name == 'push' && github.ref_name == 'release') && 'storage-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-compute') && 'compute-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-proxy') && 'proxy-release'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'main') && 'pr-main'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release') && 'storage-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-compute') && 'compute-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-proxy') && 'proxy-rc-pr'
|| (inputs.github-event-name == 'pull_request') && 'pr'
|| 'unknown'
}}
run: |
@@ -81,7 +81,7 @@ jobs:
compute-release)
echo "tag=release-compute-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
pr-main|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
pr|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
;;

View File

@@ -140,6 +140,9 @@ jobs:
--ignore test_runner/performance/test_logical_replication.py
--ignore test_runner/performance/test_physical_replication.py
--ignore test_runner/performance/test_perf_ingest_using_pgcopydb.py
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
--ignore test_runner/performance/test_perf_many_relations.py
--ignore test_runner/performance/test_perf_oltp_large_tenant.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -171,6 +174,61 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
cumstats-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Verify that cumulative statistics are preserved
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_cumulative_statistics_persistence.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 3600
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
replication-tests:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

View File

@@ -487,7 +487,7 @@ jobs:
neon-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
strategy:
matrix:
arch: [ x64, arm64 ]
@@ -537,7 +537,7 @@ jobs:
neon-image:
needs: [ neon-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -559,7 +559,7 @@ jobs:
compute-node-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -651,7 +651,7 @@ jobs:
compute-node-image:
needs: [ compute-node-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -692,15 +692,15 @@ jobs:
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
vm-compute-node-image:
vm-compute-node-image-arch:
needs: [ check-permissions, meta, compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: [ self-hosted, large ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
strategy:
fail-fast: false
matrix:
arch: [ amd64, arm64 ]
version:
# see the comment for `compute-node-image-arch` job
- pg: v14
debian: bullseye
- pg: v15
@@ -717,7 +717,7 @@ jobs:
- name: Downloading vm-builder
run: |
curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder-amd64 -o vm-builder
curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder-${{ matrix.arch }} -o vm-builder
chmod +x vm-builder
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
@@ -738,12 +738,37 @@ jobs:
-size=2G \
-spec=compute/vm-image-spec-${{ matrix.version.debian }}.yaml \
-src=neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }} \
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }} \
-target-arch=linux/amd64
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.arch }} \
-target-arch=linux/${{ matrix.arch }}
- name: Pushing vm-compute-node image
run: |
docker push neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}
docker push neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-${{ matrix.arch }}
vm-compute-node-image:
needs: [ vm-compute-node-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
strategy:
matrix:
version:
# see the comment for `compute-node-image-arch` job
- pg: v14
- pg: v15
- pg: v16
- pg: v17
steps:
- uses: docker/login-action@v3
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- name: Create multi-arch compute-node image
run: |
docker buildx imagetools create -t neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }} \
neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-amd64 \
neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.meta.outputs.build-tag }}-arm64
test-images:
needs: [ check-permissions, meta, neon-image, compute-node-image ]
@@ -775,7 +800,7 @@ jobs:
# Ensure that we don't have bad versions.
- name: Verify image versions
shell: bash # ensure no set -e for better error messages
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
run: |
pageserver_version=$(docker run --rm neondatabase/neon:${{ needs.meta.outputs.build-tag }} "/bin/sh" "-c" "/usr/local/bin/pageserver --version")
@@ -823,15 +848,15 @@ jobs:
- name: Test extension upgrade
timeout-minutes: 20
if: ${{ contains(fromJSON('["pr-main", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
env:
TAG: >-
${{
false
|| needs.meta.outputs.run-kind == 'pr-main' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'pr' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'compute-rc-pr' && needs.meta.outputs.previous-storage-release
}}
TEST_EXTENSIONS_TAG: latest
TEST_EXTENSIONS_TAG: ${{ needs.meta.outputs.previous-compute-release }}
NEW_COMPUTE_TAG: ${{ needs.meta.outputs.build-tag }}
OLD_COMPUTE_TAG: ${{ needs.meta.outputs.previous-compute-release }}
run: ./docker-compose/test_extensions_upgrade.sh
@@ -870,7 +895,7 @@ jobs:
push-neon-image-dev:
needs: [ meta, generate-image-maps, neon-image ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -888,7 +913,7 @@ jobs:
push-compute-image-dev:
needs: [ meta, generate-image-maps, vm-compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -906,7 +931,8 @@ jobs:
push-neon-image-prod:
needs: [ meta, generate-image-maps, neon-image, test-images ]
if: ${{ contains(fromJSON('["storage-release", "proxy-release"]'), needs.meta.outputs.run-kind) }}
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && contains(fromJSON('["storage-release", "proxy-release"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -924,7 +950,8 @@ jobs:
push-compute-image-prod:
needs: [ meta, generate-image-maps, vm-compute-node-image, test-images ]
if: ${{ needs.meta.outputs.run-kind == 'compute-release' }}
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && needs.meta.outputs.run-kind == 'compute-release' }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -955,7 +982,7 @@ jobs:
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -1347,7 +1374,7 @@ jobs:
|| needs.check-codestyle-python.result == 'skipped'
|| needs.check-codestyle-rust.result == 'skipped'
|| needs.files-changed.result == 'skipped'
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| needs.test-images.result == 'skipped'
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))

View File

@@ -52,8 +52,9 @@ jobs:
- name: Test extension upgrade
timeout-minutes: 20
env:
NEWTAG: latest
OLDTAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
NEW_COMPUTE_TAG: latest
OLD_COMPUTE_TAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
TEST_EXTENSIONS_TAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
PG_VERSION: ${{ matrix.pg-version }}
FORCE_ALL_UPGRADE_TESTS: true
run: ./docker-compose/test_extensions_upgrade.sh

View File

@@ -0,0 +1,147 @@
name: large oltp benchmark
on:
# uncomment to run on push for debugging your PR
push:
branches: [ bodobolero/synthetic_oltp_workload ]
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 15 * * *' # run once a day, timezone is utc, avoid conflict with other benchmarks
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow globally because we need dedicated resources which only exist once
group: large-oltp-bench-workflow
cancel-in-progress: true
jobs:
oltp:
strategy:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- target: new_branch
custom_scripts: insert_webhooks.sql@2 select_any_webhook_with_skew.sql@4 select_recent_webhook.sql@4
- target: reuse_branch
custom_scripts: insert_webhooks.sql@2 select_any_webhook_with_skew.sql@4 select_recent_webhook.sql@4
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "1h" # todo update to > 1 h
TEST_PGBENCH_CUSTOM_SCRIPTS: ${{ matrix.custom_scripts }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: 16 # pre-determined by pre-determined project
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.ref_name == 'main' }}
PLATFORM: ${{ matrix.target }}
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
# Increase timeout to 8h, default timeout is 6h
timeout-minutes: 480
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials # necessary to download artefacts
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Branch for large tenant
if: ${{ matrix.target == 'new_branch' }}
id: create-neon-branch-oltp-target
uses: ./.github/actions/neon-branch-create
with:
project_id: ${{ vars.BENCHMARK_LARGE_OLTP_PROJECTID }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Set up Connection String
id: set-up-connstr
run: |
case "${{ matrix.target }}" in
new_branch)
CONNSTR=${{ steps.create-neon-branch-oltp-target.outputs.dsn }}
;;
reuse_branch)
CONNSTR=${{ secrets.BENCHMARK_LARGE_OLTP_REUSE_CONNSTR }}
;;
*)
echo >&2 "Unknown target=${{ matrix.target }}"
exit 1
;;
esac
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
- name: Benchmark pgbench with custom-scripts
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_perf_oltp_large_tenant
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Delete Neon Branch for large tenant
if: ${{ always() && matrix.target == 'new_branch' }}
uses: ./.github/actions/neon-branch-delete
with:
project_id: ${{ vars.BENCHMARK_LARGE_OLTP_PROJECTID }}
branch_id: ${{ steps.create-neon-branch-oltp-target.outputs.branch_id }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Periodic large oltp perf testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -78,8 +78,10 @@ jobs:
run: |
if [ -z "$INPUT_COMMIT_HASH" ]; then
echo "COMMIT_HASH=$(curl -s https://api.github.com/repos/neondatabase/neon/commits/main | jq -r '.sha')" >> $GITHUB_ENV
echo "COMMIT_HASH_TYPE=latest" >> $GITHUB_ENV
else
echo "COMMIT_HASH=$INPUT_COMMIT_HASH" >> $GITHUB_ENV
echo "COMMIT_HASH_TYPE=manual" >> $GITHUB_ENV
fi
- name: Start Bench with run_id
@@ -89,7 +91,7 @@ jobs:
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer $API_KEY" \
-d "{\"neonRepoCommitHash\": \"${COMMIT_HASH}\"}"
-d "{\"neonRepoCommitHash\": \"${COMMIT_HASH}\", \"neonRepoCommitHashType\": \"${COMMIT_HASH_TYPE}\"}"
- name: Poll Test Status
id: poll_step

187
Cargo.lock generated
View File

@@ -783,6 +783,28 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-extra"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b"
dependencies = [
"axum",
"axum-core",
"bytes",
"futures-util",
"headers",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"mime",
"pin-project-lite",
"serde",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "azure_core"
version = "0.21.0"
@@ -925,9 +947,9 @@ checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.1"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f1e31e207a6b8fb791a38ea3105e6cb541f55e4d029902d3039a4ad07cc4105"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
@@ -984,9 +1006,9 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.70.1"
version = "0.71.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3"
dependencies = [
"bitflags 2.8.0",
"cexpr",
@@ -997,7 +1019,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"rustc-hash 2.1.1",
"shlex",
"syn 2.0.90",
]
@@ -1305,6 +1327,7 @@ dependencies = [
"aws-sdk-s3",
"aws-smithy-types",
"axum",
"axum-extra",
"base64 0.13.1",
"bytes",
"camino",
@@ -1316,6 +1339,7 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"notify",
@@ -2297,7 +2321,7 @@ name = "framed-websockets"
version = "0.1.0"
source = "git+https://github.com/neondatabase/framed-websockets#34eff3d6f8cfccbc5f35e4f65314ff7328621127"
dependencies = [
"base64 0.21.1",
"base64 0.21.7",
"bytemuck",
"bytes",
"futures-core",
@@ -2410,9 +2434,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.2"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
@@ -2515,6 +2539,27 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "governor"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
dependencies = [
"cfg-if",
"dashmap 6.1.0",
"futures-sink",
"futures-timer",
"futures-util",
"no-std-compat",
"nonzero_ext",
"parking_lot 0.12.1",
"portable-atomic",
"quanta",
"rand 0.8.5",
"smallvec",
"spinning_top",
]
[[package]]
name = "group"
version = "0.12.1"
@@ -2632,7 +2677,7 @@ version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [
"base64 0.21.1",
"base64 0.21.7",
"byteorder",
"crossbeam-channel",
"flate2",
@@ -2640,6 +2685,30 @@ dependencies = [
"num-traits",
]
[[package]]
name = "headers"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9"
dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 1.1.0",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
dependencies = [
"http 1.1.0",
]
[[package]]
name = "heck"
version = "0.5.0"
@@ -2777,12 +2846,9 @@ name = "http-utils"
version = "0.1.0"
dependencies = [
"anyhow",
"backtrace",
"bytes",
"fail",
"flate2",
"hyper 0.14.30",
"inferno 0.12.0",
"itertools 0.10.5",
"jemalloc_pprof",
"metrics",
@@ -3281,9 +3347,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jemalloc_pprof"
version = "0.6.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a883828bd6a4b957cd9f618886ff19e5f3ebd34e06ba0e855849e049fef32fb"
checksum = "5622af6d21ff86ed7797ef98e11b8f302da25ec69a7db9f6cde8e2e1c8df9992"
dependencies = [
"anyhow",
"libc",
@@ -3367,7 +3433,7 @@ version = "9.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
dependencies = [
"base64 0.21.1",
"base64 0.21.7",
"js-sys",
"pem",
"ring",
@@ -3482,9 +3548,9 @@ dependencies = [
[[package]]
name = "mappings"
version = "0.6.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce9229c438fbf1c333926e2053c4c091feabbd40a1b590ec62710fea2384af9e"
checksum = "e434981a332777c2b3062652d16a55f8e74fa78e6b1882633f0d77399c84fc2a"
dependencies = [
"anyhow",
"libc",
@@ -3537,7 +3603,7 @@ dependencies = [
"measured-derive",
"memchr",
"parking_lot 0.12.1",
"rustc-hash",
"rustc-hash 1.1.0",
"ryu",
]
@@ -3725,6 +3791,12 @@ dependencies = [
"memoffset 0.9.0",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.3"
@@ -3735,6 +3807,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "notify"
version = "8.0.0"
@@ -4307,9 +4385,9 @@ dependencies = [
[[package]]
name = "papaya"
version = "0.1.8"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c"
checksum = "aab21828b6b5952fdadd6c377728ffae53ec3a21b2febc47319ab65741f7e2fd"
dependencies = [
"equivalent",
"seize",
@@ -4437,7 +4515,7 @@ version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
dependencies = [
"base64 0.21.1",
"base64 0.21.7",
"serde",
]
@@ -4591,6 +4669,12 @@ dependencies = [
"never-say-never",
]
[[package]]
name = "portable-atomic"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "postgres"
version = "0.19.7"
@@ -4755,12 +4839,14 @@ dependencies = [
[[package]]
name = "pprof_util"
version = "0.6.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65c568b3f8c1c37886ae07459b1946249e725c315306b03be5632f84c239f781"
checksum = "9fa015c78eed2130951e22c58d2095849391e73817ab2e74f71b0b9f63dd8416"
dependencies = [
"anyhow",
"backtrace",
"flate2",
"inferno 0.12.0",
"num",
"paste",
"prost",
@@ -5012,7 +5098,7 @@ dependencies = [
"reqwest-tracing",
"rsa",
"rstest",
"rustc-hash",
"rustc-hash 1.1.0",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
@@ -5052,6 +5138,21 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "quanta"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
@@ -5182,6 +5283,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "raw-cpuid"
version = "11.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
dependencies = [
"bitflags 2.8.0",
]
[[package]]
name = "rayon"
version = "1.7.0"
@@ -5630,6 +5740,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.0"
@@ -5746,7 +5862,7 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.21.1",
"base64 0.21.7",
]
[[package]]
@@ -5755,7 +5871,7 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f48172685e6ff52a556baa527774f61fcaa884f59daf3375c62a3f1cd2549dab"
dependencies = [
"base64 0.21.1",
"base64 0.21.7",
"rustls-pki-types",
]
@@ -5994,9 +6110,9 @@ dependencies = [
[[package]]
name = "seize"
version = "0.4.9"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93"
checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7"
dependencies = [
"libc",
"windows-sys 0.52.0",
@@ -6389,6 +6505,15 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spinning_top"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.6.0"
@@ -6465,6 +6590,7 @@ dependencies = [
"diesel_migrations",
"fail",
"futures",
"governor",
"hex",
"http-utils",
"humantime",
@@ -7279,10 +7405,12 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
dependencies = [
"base64 0.22.1",
"bitflags 2.8.0",
"bytes",
"http 1.1.0",
"http-body 1.0.0",
"mime",
"pin-project-lite",
"tower-layer",
"tower-service",
@@ -7636,7 +7764,6 @@ dependencies = [
"anyhow",
"arc-swap",
"async-compression",
"backtrace",
"bincode",
"byteorder",
"bytes",
@@ -8190,7 +8317,7 @@ dependencies = [
"ahash",
"anyhow",
"base64 0.13.1",
"base64 0.21.1",
"base64 0.21.7",
"base64ct",
"bytes",
"camino",

View File

@@ -43,7 +43,7 @@ members = [
]
[workspace.package]
edition = "2021"
edition = "2024"
license = "Apache-2.0"
## All dependency versions, used in the project
@@ -53,7 +53,6 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
backtrace = "0.3.74"
flate2 = "1.0.26"
assert-json-diff = "2"
async-stream = "0.3"
@@ -68,9 +67,10 @@ aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.70"
bindgen = "0.71"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"
@@ -95,6 +95,7 @@ futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
git-version = "0.3"
governor = "0.8"
hashbrown = "0.14"
hashlink = "0.9.1"
hdrhistogram = "7.5.2"
@@ -113,11 +114,10 @@ hyper-util = "0.1"
tokio-tungstenite = "0.21.0"
indexmap = "2"
indoc = "2"
inferno = "0.12.0"
ipnet = "2.10.0"
itertools = "0.10"
itoa = "1.0.11"
jemalloc_pprof = "0.6"
jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
jsonwebtoken = "9"
lasso = "0.7"
libc = "0.2"
@@ -192,7 +192,7 @@ toml = "0.8"
toml_edit = "0.22"
tonic = {version = "0.12.3", default-features = false, features = ["channel", "tls", "tls-roots"]}
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
# This revision uses opentelemetry 0.27. There's no tag for it.
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }

View File

@@ -11,15 +11,16 @@ ICU_PREFIX_DIR := /usr/local/icu
#
BUILD_TYPE ?= debug
WITH_SANITIZERS ?= no
PG_CFLAGS = -fsigned-char
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS = -O2 -g3 $(CFLAGS)
PG_CFLAGS += -O2 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release
else ifeq ($(BUILD_TYPE),debug)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS = -O0 -g3 $(CFLAGS)
PG_CFLAGS += -O0 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
@@ -159,6 +160,8 @@ postgres-%: postgres-configure-% \
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_visibility install
+@echo "Compiling pageinspect $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
+@echo "Compiling pg_trgm $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_trgm install
+@echo "Compiling amcheck $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
+@echo "Compiling test_decoding $*"

View File

@@ -162,7 +162,7 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION:?} postgres
RUN cd postgres && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3 -fsigned-char' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION:?}" != "v14" ]; then \
# zstd is available only from PG15
@@ -1933,6 +1933,7 @@ RUN apt update && \
locales \
procps \
ca-certificates \
rsyslog \
$VERSION_INSTALLS && \
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
@@ -1978,6 +1979,15 @@ COPY --from=sql_exporter_preprocessor --chmod=0644 /home/nonroot/compute/etc/neo
# Make the libraries we built available
RUN echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# rsyslog config permissions
RUN chown postgres:postgres /etc/rsyslog.conf && \
touch /etc/compute_rsyslog.conf && \
chown -R postgres:postgres /etc/compute_rsyslog.conf && \
# directory for rsyslogd pid file
mkdir /var/run/rsyslogd && \
chown -R postgres:postgres /var/run/rsyslogd
ENV LANG=en_US.utf8
USER postgres
ENTRYPOINT ["/usr/local/bin/compute_ctl"]

View File

@@ -39,6 +39,10 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: rsyslogd
user: postgres
sysvInitAction: respawn
shell: '/usr/sbin/rsyslogd -i /var/run/rsyslogd/rsyslogd.pid -f /etc/compute_rsyslog.conf'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
@@ -54,7 +58,7 @@ files:
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
@@ -69,6 +73,11 @@ files:
}
memory {}
}
# Create dummy rsyslog config, because it refuses to start without at least one action configured.
# compute_ctl will rewrite this file with the actual configuration, if needed.
- filename: compute_rsyslog.conf
content: |
*.* /dev/null
build: |
# Build cgroup-tools
#
@@ -132,6 +141,10 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN set -e \
&& chmod 0644 /etc/compute_rsyslog.conf
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/

View File

@@ -39,6 +39,10 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: rsyslogd
user: postgres
sysvInitAction: respawn
shell: '/usr/sbin/rsyslogd -i /var/run/rsyslogd/rsyslogd.pid -f /etc/compute_rsyslog.conf'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
@@ -54,7 +58,7 @@ files:
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
@@ -69,6 +73,11 @@ files:
}
memory {}
}
# Create dummy rsyslog config, because it refuses to start without at least one action configured.
# compute_ctl will rewrite this file with the actual configuration, if needed.
- filename: compute_rsyslog.conf
content: |
*.* /dev/null
build: |
# Build cgroup-tools
#
@@ -128,6 +137,10 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN set -e \
&& chmod 0644 /etc/compute_rsyslog.conf
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/

View File

@@ -17,6 +17,7 @@ aws-sdk-kms.workspace = true
aws-smithy-types.workspace = true
anyhow.workspace = true
axum = { workspace = true, features = [] }
axum-extra.workspace = true
camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
@@ -25,6 +26,7 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true
notify.workspace = true

View File

@@ -33,39 +33,27 @@
//! -b /usr/local/bin/postgres \
//! -r http://pg-ext-s3-gateway \
//! ```
use std::collections::HashMap;
use std::ffi::OsString;
use std::fs::File;
use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex, RwLock, mpsc};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::ComputeSpec;
use compute_tools::compute::{
ComputeNode, ComputeState, PG_PID, ParsedSpec, forward_termination_signal,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::http::server::Server;
use compute_tools::logger::*;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals;
use tracing::{error, info, warn};
use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
@@ -164,29 +152,41 @@ fn main() -> Result<()> {
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
let (pg_handle, start_pg_result) = {
// Enter startup tracing context
let _startup_context_guard = startup_context_from_env();
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let cli_spec = try_spec_from_cli(&cli)?;
let cli_spec = try_spec_from_cli(&cli)?;
let compute = wait_spec(build_tag, &cli, cli_spec)?;
let compute_node = ComputeNode::new(
ComputeNodeParams {
compute_id: cli.compute_id,
connstr,
pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port,
ext_remote_storage: cli.remote_ext_config.clone(),
resize_swap_on_bind: cli.resize_swap_on_bind,
set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
#[cfg(target_os = "linux")]
filecache_connstr: cli.filecache_connstr,
#[cfg(target_os = "linux")]
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
build_tag,
start_postgres(&cli, compute)?
live_config_allowed: cli_spec.live_config_allowed,
},
cli_spec.spec,
cli_spec.compute_ctl_config,
)?;
// Startup is finished, exit the startup tracing span
};
// PostgreSQL is now running, if startup was successful. Wait until it exits.
let wait_pg_result = wait_postgres(pg_handle)?;
let delay_exit = cleanup_after_postgres_exit(start_pg_result)?;
maybe_delay_exit(delay_exit);
let exit_code = compute_node.run()?;
scenario.teardown();
deinit_and_exit(wait_pg_result);
deinit_and_exit(exit_code);
}
async fn init() -> Result<String> {
@@ -207,56 +207,6 @@ async fn init() -> Result<String> {
Ok(build_tag)
}
fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
// Extract OpenTelemetry context for the startup actions from the
// TRACEPARENT and TRACESTATE env variables, and attach it to the current
// tracing context.
//
// This is used to propagate the context for the 'start_compute' operation
// from the neon control plane. This allows linking together the wider
// 'start_compute' operation that creates the compute container, with the
// startup actions here within the container.
//
// There is no standard for passing context in env variables, but a lot of
// tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
// https://github.com/open-telemetry/opentelemetry-specification/issues/740
//
// Switch to the startup context here, and exit it once the startup has
// completed and Postgres is up and running.
//
// If this pod is pre-created without binding it to any particular endpoint
// yet, this isn't the right place to enter the startup context. In that
// case, the control plane should pass the tracing context as part of the
// /configure API call.
//
// NOTE: This is supposed to only cover the *startup* actions. Once
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
//
// XXX: If the pod is restarted, we perform the startup actions in the same
// context as the original startup actions, which probably doesn't make
// sense.
let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
if let Ok(val) = std::env::var("TRACEPARENT") {
startup_tracing_carrier.insert("traceparent".to_string(), val);
}
if let Ok(val) = std::env::var("TRACESTATE") {
startup_tracing_carrier.insert("tracestate".to_string(), val);
}
if !startup_tracing_carrier.is_empty() {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
let guard = TraceContextPropagator::new()
.extract(&startup_tracing_carrier)
.attach();
info!("startup tracing context attached");
Some(guard)
} else {
None
}
}
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
// First, try to get cluster spec from the cli argument
if let Some(ref spec_json) = cli.spec_json {
@@ -307,357 +257,7 @@ struct CliSpecParams {
live_config_allowed: bool,
}
fn wait_spec(
build_tag: String,
cli: &Cli,
CliSpecParams {
spec,
live_config_allowed,
compute_ctl_config: _,
}: CliSpecParams,
) -> Result<Arc<ComputeNode>> {
let mut new_state = ComputeState::new();
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
info!("new pspec.spec: {:?}", pspec.spec);
new_state.pspec = Some(pspec);
spec_set = true;
} else {
spec_set = false;
}
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
let conn_conf = postgres::config::Config::from_str(connstr.as_str())
.context("cannot build postgres config from connstr")?;
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
.context("cannot build tokio postgres config from connstr")?;
let compute_node = ComputeNode {
compute_id: cli.compute_id.clone(),
connstr,
conn_conf,
tokio_conn_conf,
pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port,
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage: cli.remote_ext_config.clone(),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag,
};
let compute = Arc::new(compute_node);
// If this is a pooled VM, prewarm before starting HTTP server and becoming
// available for binding. Prewarming helps Postgres start quicker later,
// because QEMU will already have its memory allocated from the host, and
// the necessary binaries will already be cached.
if !spec_set {
compute.prewarm_postgres()?;
}
// Launch the external HTTP server first, so that we can serve control plane
// requests while configuration is still in progress.
Server::External(cli.external_http_port).launch(&compute);
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
Server::Internal(cli.internal_http_port).launch(&compute);
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
let mut state = compute.state.lock().unwrap();
while state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got spec, continue configuration");
// Spec is already set by the http server handler.
break;
}
}
// Record for how long we slept waiting for the spec.
let now = Utc::now();
state.metrics.wait_for_spec_ms = now
.signed_duration_since(state.start_time)
.to_std()
.unwrap()
.as_millis() as u64;
// Reset start time, so that the total startup time that is calculated later will
// not include the time that we waited for the spec.
state.start_time = now;
}
launch_lsn_lease_bg_task_for_static(&compute);
Ok(compute)
}
fn start_postgres(
cli: &Cli,
compute: Arc<ComputeNode>,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
// Create a tracing span for the startup operation.
//
// We could otherwise just annotate the function with #[instrument], but if
// we're being configured from a /configure HTTP request, we want the
// startup to be considered part of the /configure request.
let _this_entered = {
// Temporarily enter the /configure request's span, so that the new span
// becomes its child.
let _parent_entered = state.startup_span.take().map(|p| p.entered());
tracing::info_span!("start_postgres")
}
.entered();
state.set_status(ComputeStatus::Init, &compute.state_changed);
info!(
"running compute with features: {:?}",
state.pspec.as_ref().unwrap().spec.features
);
// before we release the mutex, fetch some parameters for later.
let &ComputeSpec {
swap_size_bytes,
disk_quota_bytes,
#[cfg(target_os = "linux")]
disable_lfc_resizing,
..
} = &state.pspec.as_ref().unwrap().spec;
drop(state);
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
let mut prestartup_failed = false;
let mut delay_exit = false;
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
// OOM-killed during startup because swap wasn't available yet.
match resize_swap(size_bytes) {
Ok(()) => {
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
}
Err(err) => {
let err = err.context("failed to resize swap");
error!("{err:#}");
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
delay_exit = true;
}
}
}
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
{
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
}
Err(err) => {
let err = err.context("failed to set disk quota");
error!("{err:#}");
// Mark compute startup as failed; don't try to start postgres, and report this
// error to the control plane when it next asks.
prestartup_failed = true;
compute.set_failed_status(err);
delay_exit = true;
}
}
}
// Start Postgres
let mut pg = None;
if !prestartup_failed {
pg = match compute.start_compute() {
Ok(pg) => {
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
Some(pg)
}
Err(err) => {
error!("could not start the compute node: {:#}", err);
compute.set_failed_status(err);
delay_exit = true;
None
}
};
} else {
warn!("skipping postgres startup because pre-startup step failed");
}
// Start the vm-monitor if directed to. The vm-monitor only runs on linux
// because it requires cgroups.
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use std::env;
use tokio_util::sync::CancellationToken;
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
Some(cli.filecache_connstr.clone())
};
let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
let vm_monitor = tokio::spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: Some(cli.cgroup.clone()),
pgconnstr,
addr: cli.vm_monitor_addr.clone(),
})),
token.clone(),
));
Some(vm_monitor)
} else {
None
};
}
}
Ok((
pg,
StartPostgresResult {
delay_exit,
compute,
#[cfg(target_os = "linux")]
token,
#[cfg(target_os = "linux")]
vm_monitor,
},
))
}
type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);
struct StartPostgresResult {
delay_exit: bool,
// passed through from WaitSpecResult
compute: Arc<ComputeNode>,
#[cfg(target_os = "linux")]
token: tokio_util::sync::CancellationToken,
#[cfg(target_os = "linux")]
vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
}
fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
// Wait for the child Postgres process forever. In this state Ctrl+C will
// propagate to Postgres and it will be shut down as well.
let mut exit_code = None;
if let Some((mut pg, logs_handle)) = pg {
info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit");
let ecode = pg
.wait()
.expect("failed to start waiting on Postgres process");
PG_PID.store(0, Ordering::SeqCst);
// Process has exited. Wait for the log collecting task to finish.
let _ = tokio::runtime::Handle::current()
.block_on(logs_handle)
.map_err(|e| tracing::error!("log task panicked: {:?}", e));
info!("Postgres exited with code {}, shutting down", ecode);
exit_code = ecode.code()
}
Ok(WaitPostgresResult { exit_code })
}
struct WaitPostgresResult {
exit_code: Option<i32>,
}
fn cleanup_after_postgres_exit(
StartPostgresResult {
mut delay_exit,
compute,
#[cfg(target_os = "linux")]
vm_monitor,
#[cfg(target_os = "linux")]
token,
}: StartPostgresResult,
) -> Result<bool> {
// Terminate the vm_monitor so it releases the file watcher on
// /sys/fs/cgroup/neon-postgres.
// Note: the vm-monitor only runs on linux because it requires cgroups.
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
if let Some(handle) = vm_monitor {
// Kills all threads spawned by the monitor
token.cancel();
// Kills the actual task running the monitor
handle.abort();
}
}
}
// Maybe sync safekeepers again, to speed up next startup
let compute_state = compute.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
info!("syncing safekeepers on shutdown");
let storage_auth_token = pspec.storage_auth_token.clone();
let lsn = compute.sync_safekeepers(storage_auth_token)?;
info!("synced safekeepers at lsn {lsn}");
}
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::TerminationPending {
state.status = ComputeStatus::Terminated;
compute.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = true
}
drop(state);
if let Err(err) = compute.check_for_core_dumps() {
error!("error while checking for core dumps: {err:?}");
}
Ok(delay_exit)
}
fn maybe_delay_exit(delay_exit: bool) {
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
thread::sleep(Duration::from_secs(30));
}
}
fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! {
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:

View File

@@ -58,14 +58,14 @@ pub async fn get_database_schema(
compute: &Arc<ComputeNode>,
dbname: &str,
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>> + use<>, SchemaDumpError> {
let pgbin = &compute.pgbin;
let pgbin = &compute.params.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");
// Replace the DB in the connection string and disable it to parts.
// This is the only option to handle DBs with special characters.
let conf =
postgres_conf_for_db(&compute.connstr, dbname).map_err(|_| SchemaDumpError::Unexpected)?;
let conf = postgres_conf_for_db(&compute.params.connstr, dbname)
.map_err(|_| SchemaDumpError::Unexpected)?;
let host = conf
.get_hosts()
.first()

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,16 @@
use anyhow::Result;
use std::fmt::Write as FmtWrite;
use std::fs::{File, OpenOptions};
use std::io;
use std::io::Write;
use std::io::prelude::*;
use std::path::Path;
use anyhow::Result;
use compute_api::spec::{ComputeMode, ComputeSpec, GenericOption};
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize, escape_conf_value};
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
@@ -55,10 +59,20 @@ pub fn write_postgres_conf(
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if !spec.safekeeper_connstrings.is_empty() {
let mut neon_safekeepers_value = String::new();
tracing::info!(
"safekeepers_connstrings is not zero, gen: {:?}",
spec.safekeepers_generation
);
// If generation is given, prepend sk list with g#number:
if let Some(generation) = spec.safekeepers_generation {
write!(neon_safekeepers_value, "g#{}:", generation)?;
}
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
writeln!(
file,
"neon.safekeepers={}",
escape_conf_value(&spec.safekeeper_connstrings.join(","))
escape_conf_value(&neon_safekeepers_value)
)?;
}
if let Some(s) = &spec.tenant_id {
@@ -126,6 +140,54 @@ pub fn write_postgres_conf(
writeln!(file, "# Managed by compute_ctl: end")?;
}
// If audit logging is enabled, configure pgaudit.
//
// Note, that this is called after the settings from spec are written.
// This way we always override the settings from the spec
// and don't allow the user or the control plane admin to change them.
if let ComputeAudit::Hipaa = spec.audit_log_level {
writeln!(file, "# Managed by compute_ctl audit settings: begin")?;
// This log level is very verbose
// but this is necessary for HIPAA compliance.
writeln!(file, "pgaudit.log='all'")?;
writeln!(file, "pgaudit.log_parameter=on")?;
// Disable logging of catalog queries
// The catalog doesn't contain sensitive data, so we don't need to audit it.
writeln!(file, "pgaudit.log_catalog=off")?;
// Set log rotation to 5 minutes
// TODO: tune this after performance testing
writeln!(file, "pgaudit.log_rotation_age=5")?;
// Add audit shared_preload_libraries, if they are not present.
//
// The caller who sets the flag is responsible for ensuring that the necessary
// shared_preload_libraries are present in the compute image,
// otherwise the compute start will fail.
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
let mut extra_shared_preload_libraries = String::new();
if !libs.contains("pgaudit") {
extra_shared_preload_libraries.push_str(",pgaudit");
}
if !libs.contains("pgauditlogtofile") {
extra_shared_preload_libraries.push_str(",pgauditlogtofile");
}
writeln!(
file,
"shared_preload_libraries='{}{}'",
libs, extra_shared_preload_libraries
)?;
} else {
// Typically, this should be unreacheable,
// because we always set at least some shared_preload_libraries in the spec
// but let's handle it explicitly anyway.
writeln!(
file,
"shared_preload_libraries='neon,pgaudit,pgauditlogtofile'"
)?;
}
writeln!(file, "# Managed by compute_ctl audit settings: end")?;
}
writeln!(file, "neon.extension_server_port={}", extension_server_port)?;
if spec.drop_subscriptions_before_start {

View File

@@ -0,0 +1,10 @@
# Load imfile module to read log files
module(load="imfile")
# Input configuration for log files in the specified directory
# Replace {log_directory} with the directory containing the log files
input(type="imfile" File="{log_directory}/*.log" Tag="{tag}" Severity="info" Facility="local0")
global(workDirectory="/var/log")
# Forward logs to remote syslog server
*.* @@{remote_endpoint}

View File

@@ -1,7 +1,9 @@
pub(crate) mod json;
pub(crate) mod path;
pub(crate) mod query;
pub(crate) mod request_id;
pub(crate) use json::Json;
pub(crate) use path::Path;
pub(crate) use query::Query;
pub(crate) use request_id::RequestId;

View File

@@ -0,0 +1,86 @@
use std::{
fmt::Display,
ops::{Deref, DerefMut},
};
use axum::{extract::FromRequestParts, response::IntoResponse};
use http::{StatusCode, request::Parts};
use crate::http::{JsonResponse, headers::X_REQUEST_ID};
/// Extract the request ID from the `X-Request-Id` header.
#[derive(Debug, Clone, Default)]
pub(crate) struct RequestId(pub String);
#[derive(Debug)]
/// Rejection used for [`RequestId`].
///
/// Contains one variant for each way the [`RequestId`] extractor can
/// fail.
pub(crate) enum RequestIdRejection {
/// The request is missing the header.
MissingRequestId,
/// The value of the header is invalid UTF-8.
InvalidUtf8,
}
impl RequestIdRejection {
pub fn status(&self) -> StatusCode {
match self {
RequestIdRejection::MissingRequestId => StatusCode::INTERNAL_SERVER_ERROR,
RequestIdRejection::InvalidUtf8 => StatusCode::BAD_REQUEST,
}
}
pub fn message(&self) -> String {
match self {
RequestIdRejection::MissingRequestId => "request ID is missing",
RequestIdRejection::InvalidUtf8 => "request ID is invalid UTF-8",
}
.to_string()
}
}
impl IntoResponse for RequestIdRejection {
fn into_response(self) -> axum::response::Response {
JsonResponse::error(self.status(), self.message())
}
}
impl<S> FromRequestParts<S> for RequestId
where
S: Send + Sync,
{
type Rejection = RequestIdRejection;
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
match parts.headers.get(X_REQUEST_ID) {
Some(value) => match value.to_str() {
Ok(request_id) => Ok(Self(request_id.to_string())),
Err(_) => Err(RequestIdRejection::InvalidUtf8),
},
None => Err(RequestIdRejection::MissingRequestId),
}
}
}
impl Deref for RequestId {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for RequestId {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Display for RequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}

View File

@@ -0,0 +1,2 @@
/// Constant for `X-Request-Id` header.
pub const X_REQUEST_ID: &str = "x-request-id";

View File

@@ -0,0 +1,145 @@
use std::{collections::HashSet, net::SocketAddr};
use anyhow::{Result, anyhow};
use axum::{RequestExt, body::Body, extract::ConnectInfo};
use axum_extra::{
TypedHeader,
headers::{Authorization, authorization::Bearer},
};
use futures::future::BoxFuture;
use http::{Request, Response, StatusCode};
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
use serde::Deserialize;
use tower_http::auth::AsyncAuthorizeRequest;
use tracing::warn;
use crate::http::{JsonResponse, extract::RequestId};
#[derive(Clone, Debug, Deserialize)]
pub(in crate::http) struct Claims {
compute_id: String,
}
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
compute_id: String,
jwks: JwkSet,
validation: Validation,
}
impl Authorize {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// Nothing is currently required
validation.required_spec_claims = HashSet::new();
validation.validate_exp = true;
// Unused by the control plane
validation.validate_aud = false;
// Unused by the control plane
validation.validate_nbf = false;
Self {
compute_id,
jwks,
validation,
}
}
}
impl AsyncAuthorizeRequest<Body> for Authorize {
type RequestBody = Body;
type ResponseBody = Body;
type Future = BoxFuture<'static, Result<Request<Body>, Response<Self::ResponseBody>>>;
fn authorize(&mut self, mut request: Request<Body>) -> Self::Future {
let compute_id = self.compute_id.clone();
let jwks = self.jwks.clone();
let validation = self.validation.clone();
Box::pin(async move {
let request_id = request.extract_parts::<RequestId>().await.unwrap();
// TODO: Remove this check after a successful rollout
if jwks.keys.is_empty() {
warn!(%request_id, "Authorization has not been configured");
return Ok(request);
}
let connect_info = request
.extract_parts::<ConnectInfo<SocketAddr>>()
.await
.unwrap();
// In the event the request is coming from the loopback interface,
// allow all requests
if connect_info.ip().is_loopback() {
warn!(%request_id, "Bypassed authorization because request is coming from the loopback interface");
return Ok(request);
}
let TypedHeader(Authorization(bearer)) = request
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|_| {
JsonResponse::error(StatusCode::BAD_REQUEST, "invalid authorization token")
})?;
let data = match Self::verify(&jwks, bearer.token(), &validation) {
Ok(claims) => claims,
Err(e) => return Err(JsonResponse::error(StatusCode::UNAUTHORIZED, e)),
};
if data.claims.compute_id != compute_id {
return Err(JsonResponse::error(
StatusCode::UNAUTHORIZED,
"invalid claims in authorization token",
));
}
// Make claims available to any subsequent middleware or request
// handlers
request.extensions_mut().insert(data.claims);
Ok(request)
})
}
}
impl Authorize {
/// Verify the token using the JSON Web Key set and return the token data.
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
debug_assert!(!jwks.keys.is_empty());
for jwk in jwks.keys.iter() {
let decoding_key = match DecodingKey::from_jwk(jwk) {
Ok(key) => key,
Err(e) => {
warn!(
"Failed to construct decoding key from {}: {}",
jwk.common.key_id.as_ref().unwrap(),
e
);
continue;
}
};
match jsonwebtoken::decode::<Claims>(token, &decoding_key, validation) {
Ok(data) => return Ok(data),
Err(e) => {
warn!(
"Failed to decode authorization token using {}: {}",
jwk.common.key_id.as_ref().unwrap(),
e
);
continue;
}
}
}
Err(anyhow!("Failed to verify authorization token"))
}
}

View File

@@ -0,0 +1 @@
pub(in crate::http) mod authorize;

View File

@@ -7,6 +7,8 @@ use serde::Serialize;
use tracing::error;
mod extract;
mod headers;
mod middleware;
mod routes;
pub mod server;

View File

@@ -22,7 +22,7 @@ pub(in crate::http) async fn configure(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigurationRequest>,
) -> Response {
if !compute.live_config_allowed {
if !compute.params.live_config_allowed {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"live configuration is not allowed for this compute node".to_string(),

View File

@@ -18,11 +18,11 @@ pub(in crate::http) struct ExtensionServerParams {
/// Download a remote extension.
pub(in crate::http) async fn download_extension(
Path(filename): Path<String>,
params: Query<ExtensionServerParams>,
ext_server_params: Query<ExtensionServerParams>,
State(compute): State<Arc<ComputeNode>>,
) -> Response {
// Don't even try to download extensions if no remote storage is configured
if compute.ext_remote_storage.is_none() {
if compute.params.ext_remote_storage.is_none() {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"remote storage is not configured",
@@ -46,9 +46,9 @@ pub(in crate::http) async fn download_extension(
remote_extensions.get_ext(
&filename,
params.is_library,
&compute.build_tag,
&compute.pgversion,
ext_server_params.is_library,
&compute.params.build_tag,
&compute.params.pgversion,
)
};

View File

@@ -10,48 +10,58 @@ use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use http::StatusCode;
use jsonwebtoken::jwk::JwkSet;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::request_id::PropagateRequestIdLayer;
use tower_http::trace::TraceLayer;
use tracing::{Span, debug, error, info};
use tower_http::{
auth::AsyncRequireAuthorizationLayer, request_id::PropagateRequestIdLayer, trace::TraceLayer,
};
use tracing::{Span, error, info};
use uuid::Uuid;
use super::routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, metrics, metrics_json, status, terminate,
use super::{
headers::X_REQUEST_ID,
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
const X_REQUEST_ID: &str = "x-request-id";
/// `compute_ctl` has two servers: internal and external. The internal server
/// binds to the loopback interface and handles communication from clients on
/// the compute. The external server is what receives communication from the
/// control plane, the metrics scraper, etc. We make the distinction because
/// certain routes in `compute_ctl` only need to be exposed to local processes
/// like Postgres via the neon extension and local_proxy.
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Debug)]
pub enum Server {
Internal(u16),
External(u16),
Internal {
port: u16,
},
External {
port: u16,
jwks: JwkSet,
compute_id: String,
},
}
impl Display for Server {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Server::Internal(_) => f.write_str("internal"),
Server::External(_) => f.write_str("external"),
Server::Internal { .. } => f.write_str("internal"),
Server::External { .. } => f.write_str("external"),
}
}
}
impl From<Server> for Router<Arc<ComputeNode>> {
fn from(server: Server) -> Self {
impl From<&Server> for Router<Arc<ComputeNode>> {
fn from(server: &Server) -> Self {
let mut router = Router::<Arc<ComputeNode>>::new();
router = match server {
Server::Internal(_) => {
Server::Internal { .. } => {
router = router
.route(
"/extension_server/{*filename}",
@@ -69,59 +79,71 @@ impl From<Server> for Router<Arc<ComputeNode>> {
router
}
Server::External(_) => router
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route("/insights", get(insights::get_insights))
.route("/metrics", get(metrics::get_metrics))
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate)),
Server::External {
jwks, compute_id, ..
} => {
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route("/insights", get(insights::get_insights))
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate))
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
compute_id.clone(),
jwks.clone(),
)));
router
.merge(unauthenticated_router)
.merge(authenticated_router)
}
};
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
ServiceBuilder::new()
// Add this middleware since we assume the request ID exists
.layer(middleware::from_fn(maybe_add_request_id_header))
.layer(
TraceLayer::new_for_http()
.on_request(|request: &http::Request<_>, _span: &Span| {
let request_id = request
.headers()
.get(X_REQUEST_ID)
.unwrap()
.to_str()
.unwrap();
match request.uri().path() {
"/metrics" => {
debug!(%request_id, "{} {}", request.method(), request.uri())
}
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
};
})
.on_response(
|response: &http::Response<_>, latency: Duration, _span: &Span| {
let request_id = response
router
.fallback(Server::handle_404)
.method_not_allowed_fallback(Server::handle_405)
.layer(
ServiceBuilder::new()
.layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO))
// Add this middleware since we assume the request ID exists
.layer(middleware::from_fn(maybe_add_request_id_header))
.layer(
TraceLayer::new_for_http()
.on_request(|request: &http::Request<_>, _span: &Span| {
let request_id = request
.headers()
.get(X_REQUEST_ID)
.unwrap()
.to_str()
.unwrap();
info!(
%request_id,
code = response.status().as_u16(),
latency = latency.as_millis()
)
},
),
)
.layer(PropagateRequestIdLayer::x_request_id()),
)
.layer(tower_otel::trace::HttpLayer::server(tracing::Level::INFO))
info!(%request_id, "{} {}", request.method(), request.uri());
})
.on_response(
|response: &http::Response<_>, latency: Duration, _span: &Span| {
let request_id = response
.headers()
.get(X_REQUEST_ID)
.unwrap()
.to_str()
.unwrap();
info!(
%request_id,
code = response.status().as_u16(),
latency = latency.as_millis()
);
},
),
)
.layer(PropagateRequestIdLayer::x_request_id()),
)
}
}
@@ -145,15 +167,15 @@ impl Server {
match self {
// TODO: Change this to Ipv6Addr::LOCALHOST when the GitHub runners
// allow binding to localhost
Server::Internal(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
Server::External(_) => IpAddr::from(Ipv6Addr::UNSPECIFIED),
Server::Internal { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED),
Server::External { .. } => IpAddr::from(Ipv6Addr::UNSPECIFIED),
}
}
fn port(self) -> u16 {
fn port(&self) -> u16 {
match self {
Server::Internal(port) => port,
Server::External(port) => port,
Server::Internal { port, .. } => *port,
Server::External { port, .. } => *port,
}
}
@@ -180,7 +202,9 @@ impl Server {
);
}
let router = Router::from(self).with_state(compute);
let router = Router::from(&self)
.with_state(compute)
.into_make_service_with_connect_info::<SocketAddr>();
if let Err(e) = axum::serve(listener, router).await {
error!("compute_ctl {} HTTP server error: {}", self, e);

View File

@@ -21,6 +21,7 @@ mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod rsyslog;
pub mod spec;
mod spec_apply;
pub mod swap;

View File

@@ -1,3 +1,5 @@
use std::collections::HashMap;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
@@ -42,3 +44,50 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result
pub fn inlinify(s: &str) -> String {
s.replace('\n', "\u{200B}")
}
pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
// Extract OpenTelemetry context for the startup actions from the
// TRACEPARENT and TRACESTATE env variables, and attach it to the current
// tracing context.
//
// This is used to propagate the context for the 'start_compute' operation
// from the neon control plane. This allows linking together the wider
// 'start_compute' operation that creates the compute container, with the
// startup actions here within the container.
//
// There is no standard for passing context in env variables, but a lot of
// tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
// https://github.com/open-telemetry/opentelemetry-specification/issues/740
//
// Switch to the startup context here, and exit it once the startup has
// completed and Postgres is up and running.
//
// If this pod is pre-created without binding it to any particular endpoint
// yet, this isn't the right place to enter the startup context. In that
// case, the control plane should pass the tracing context as part of the
// /configure API call.
//
// NOTE: This is supposed to only cover the *startup* actions. Once
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
//
// XXX: If the pod is restarted, we perform the startup actions in the same
// context as the original startup actions, which probably doesn't make
// sense.
let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
if let Ok(val) = std::env::var("TRACEPARENT") {
startup_tracing_carrier.insert("traceparent".to_string(), val);
}
if let Ok(val) = std::env::var("TRACESTATE") {
startup_tracing_carrier.insert("tracestate".to_string(), val);
}
if !startup_tracing_carrier.is_empty() {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
info!("got startup tracing context from env variables");
Some(TraceContextPropagator::new().extract(&startup_tracing_carrier))
} else {
None
}
}

View File

@@ -18,7 +18,7 @@ const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
// should be handled gracefully.
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let connstr = compute.connstr.clone();
let connstr = compute.params.connstr.clone();
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
// During startup and configuration we connect to every Postgres database,

View File

@@ -0,0 +1,91 @@
use std::process::Command;
use std::{fs::OpenOptions, io::Write};
use anyhow::{Context, Result};
use tracing::info;
fn get_rsyslog_pid() -> Option<String> {
let output = Command::new("pgrep")
.arg("rsyslogd")
.output()
.expect("Failed to execute pgrep");
if !output.stdout.is_empty() {
let pid = std::str::from_utf8(&output.stdout)
.expect("Invalid UTF-8 in process output")
.trim()
.to_string();
Some(pid)
} else {
None
}
}
// Start rsyslogd with the specified configuration file
// If it is already running - restart it.
// This is necessary, because there is no other way to reload the rsyslog configuration.
//
// Rsyslogd shouldn't lose any messages, because of the restart,
// because it tracks the last read position in the log files
// and will continue reading from that position.
// TODO: test it properly
//
fn start_rsyslog(rsyslog_conf_path: &str) -> Result<()> {
let old_pid = get_rsyslog_pid();
if let Some(pid) = old_pid {
info!("rsyslogd is already running with pid: {}, restart it", pid);
// kill it to restart
let _ = Command::new("pkill")
.arg("rsyslogd")
.output()
.context("Failed to stop rsyslogd")?;
}
let _ = Command::new("/usr/sbin/rsyslogd")
.arg("-f")
.arg(rsyslog_conf_path)
.arg("-i")
.arg("/var/run/rsyslogd/rsyslogd.pid")
.output()
.context("Failed to start rsyslogd")?;
// Check that rsyslogd is running
if let Some(pid) = get_rsyslog_pid() {
info!("rsyslogd started successfully with pid: {}", pid);
} else {
return Err(anyhow::anyhow!("Failed to start rsyslogd"));
}
Ok(())
}
pub fn configure_and_start_rsyslog(
log_directory: &str,
tag: &str,
remote_endpoint: &str,
) -> Result<()> {
let config_content: String = format!(
include_str!("config_template/compute_rsyslog_template.conf"),
log_directory = log_directory,
tag = tag,
remote_endpoint = remote_endpoint
);
info!("rsyslog config_content: {}", config_content);
let rsyslog_conf_path = "/etc/compute_rsyslog.conf";
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(rsyslog_conf_path)?;
file.write_all(config_content.as_bytes())?;
info!("rsyslog configuration added successfully. Starting rsyslogd");
// start the service, using the configuration
start_rsyslog(rsyslog_conf_path)?;
Ok(())
}

View File

@@ -4,15 +4,426 @@ use std::future::Future;
use std::iter::{empty, once};
use std::sync::Arc;
use anyhow::Result;
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use anyhow::{Context, Result};
use compute_api::responses::ComputeStatus;
use compute_api::spec::{ComputeAudit, ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use tracing::{Instrument, debug, info_span, warn};
use tokio_postgres::error::SqlState;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use crate::compute::construct_superuser_query;
use crate::pg_helpers::{DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal};
use crate::compute::{ComputeNode, ComputeState, construct_superuser_query};
use crate::pg_helpers::{
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async,
get_existing_roles_async,
};
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
CreatePgauditlogtofileExtension, CreateSchemaNeon, CreateSuperUser, DisablePostgresDBPgAudit,
DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, HandleNeonExtension,
HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
impl ComputeNode {
/// Apply the spec to the running PostgreSQL instance.
/// The caller can decide to run with multiple clients in parallel, or
/// single mode. Either way, the commands executed will be the same, and
/// only commands run in different databases are parallelized.
#[instrument(skip_all)]
pub fn apply_spec_sql(
&self,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
concurrency: usize,
) -> Result<()> {
info!("Applying config with max {} concurrency", concurrency);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let databases = get_existing_dbs_async(&client).await?;
let roles = get_existing_roles_async(&client)
.await?
.into_iter()
.map(|role| (role.name.clone(), role))
.collect::<HashMap<String, Role>>();
// Check if we need to drop subscriptions before starting the endpoint.
//
// It is important to do this operation exactly once when endpoint starts on a new branch.
// Otherwise, we may drop not inherited, but newly created subscriptions.
//
// We cannot rely only on spec.drop_subscriptions_before_start flag,
// because if for some reason compute restarts inside VM,
// it will start again with the same spec and flag value.
//
// To handle this, we save the fact of the operation in the database
// in the neon.drop_subscriptions_done table.
// If the table does not exist, we assume that the operation was never performed, so we must do it.
// If table exists, we check if the operation was performed on the current timelilne.
//
let mut drop_subscriptions_done = false;
if spec.drop_subscriptions_before_start {
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.simple_query(&query).await {
Ok(result) => {
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
},
Err(e) =>
{
match e.code() {
Some(&SqlState::UNDEFINED_TABLE) => false,
_ => {
// We don't expect any other error here, except for the schema/table not existing
error!("Error checking if drop subscription operation was already performed: {}", e);
return Err(e.into());
}
}
}
}
};
let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
.iter()
.flat_map(|it| &it.jwks)
.flatten()
.flat_map(|setting| &setting.role_names)
.cloned()
.collect::<HashSet<_>>(),
);
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
roles,
dbs: databases,
}));
// Apply special pre drop database phase.
// NOTE: we use the code of RunInEachDatabase phase for parallelism
// and connection management, but we don't really run it in *each* database,
// only in databases, we're about to drop.
info!("Applying PerDatabase (pre-dropdb) phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
// Run the phase for each database that we're about to drop.
let db_processes = spec
.delta_operations
.iter()
.flatten()
.filter_map(move |op| {
if op.action.as_str() == "delete_db" {
Some(op.name.clone())
} else {
None
}
})
.map(|dbname| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
// We only need dbname field for this phase, so set other fields to dummy values
let db = DB::UserDB(Database {
name: dbname.clone(),
owner: "cloud_admin".to_string(),
options: None,
restrict_conn: false,
invalid: false,
});
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
[DropLogicalSubscriptions].to_vec(),
);
Ok(tokio::spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
if let Err(e) = handle.await? {
// Handle the error case where the database does not exist
// We do not check whether the DB exists or not in the deletion phase,
// so we shouldn't be strict about it in pre-deletion cleanup as well.
if e.to_string().contains("does not exist") {
warn!("Error dropping subscription: {}", e);
} else {
return Err(e);
}
};
}
for phase in [
CreateSuperUser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
CreateSchemaNeon,
] {
info!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
info!("Applying RunInEachDatabase2 phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
let db_processes = spec
.cluster
.databases
.iter()
.map(|db| DB::new(db.clone()))
// include
.chain(once(DB::SystemDB))
.map(|db| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
let db = db.clone();
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let mut phases = vec![
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
];
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(DropLogicalSubscriptions);
}
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
phases,
);
Ok(tokio::spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
handle.await??;
}
let mut phases = vec![
HandleOtherExtensions,
HandleNeonExtension, // This step depends on CreateSchemaNeon
CreateAvailabilityCheck,
DropRoles,
];
// This step depends on CreateSchemaNeon
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(FinalizeDropLogicalSubscriptions);
}
// Keep DisablePostgresDBPgAudit phase at the end,
// so that all config operations are audit logged.
match spec.audit_log_level
{
ComputeAudit::Hipaa => {
phases.push(CreatePgauditExtension);
phases.push(CreatePgauditlogtofileExtension);
phases.push(DisablePostgresDBPgAudit);
}
ComputeAudit::Log => { /* not implemented yet */ }
ComputeAudit::Disabled => {}
}
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
Ok::<(), anyhow::Error>(())
})?;
Ok(())
}
/// Apply SQL migrations of the RunInEachDatabase phase.
///
/// May opt to not connect to databases that don't have any scheduled
/// operations. The function is concurrency-controlled with the provided
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
async fn apply_spec_sql_db(
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
concurrency_token: Arc<tokio::sync::Semaphore>,
db: DB,
subphases: Vec<PerDatabasePhase>,
) -> Result<()> {
let _permit = concurrency_token.acquire().await?;
let mut client_conn = None;
for subphase in subphases {
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
RunInEachDatabase {
db: db.clone(),
subphase,
},
// Only connect if apply_operation actually wants a connection.
// It's quite possible this database doesn't need any queries,
// so by not connecting we save time and effort connecting to
// that database.
|| async {
if client_conn.is_none() {
let db_client = Self::get_maintenance_client(&conf).await?;
client_conn.replace(db_client);
}
let client = client_conn.as_ref().unwrap();
Ok(client)
},
)
.await?;
}
drop(client_conn);
Ok::<(), anyhow::Error>(())
}
/// Choose how many concurrent connections to use for applying the spec changes.
pub fn max_service_connections(
&self,
compute_state: &ComputeState,
spec: &ComputeSpec,
) -> usize {
// If the cluster is in Init state we don't have to deal with user connections,
// and can thus use all `max_connections` connection slots. However, that's generally not
// very efficient, so we generally still limit it to a smaller number.
if compute_state.status == ComputeStatus::Init {
// If the settings contain 'max_connections', use that as template
if let Some(config) = spec.cluster.settings.find("max_connections") {
config.parse::<usize>().ok()
} else {
// Otherwise, try to find the setting in the postgresql_conf string
spec.cluster
.postgresql_conf
.iter()
.flat_map(|conf| conf.split("\n"))
.filter_map(|line| {
if !line.contains("max_connections") {
return None;
}
let (key, value) = line.split_once("=")?;
let key = key
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
let value = value
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
if key != "max_connections" {
return None;
}
value.parse::<usize>().ok()
})
.next()
}
// If max_connections is present, use at most 1/3rd of that.
// When max_connections is lower than 30, try to use at least 10 connections, but
// never more than max_connections.
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
} else {
// state == Running
// Because the cluster is already in the Running state, we should assume users are
// already connected to the cluster, and high concurrency could negatively
// impact user connectivity. Therefore, we can limit concurrency to the number of
// reserved superuser connections, which users wouldn't be able to use anyway.
spec.cluster
.settings
.find("superuser_reserved_connections")
.iter()
.filter_map(|val| val.parse::<usize>().ok())
.map(|val| if val > 1 { val - 1 } else { 1 })
.last()
.unwrap_or(3)
}
}
}
#[derive(Clone)]
pub enum DB {
@@ -65,6 +476,9 @@ pub enum ApplySpecPhase {
CreateAndAlterDatabases,
CreateSchemaNeon,
RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
CreatePgauditExtension,
CreatePgauditlogtofileExtension,
DisablePostgresDBPgAudit,
HandleOtherExtensions,
HandleNeonExtension,
CreateAvailabilityCheck,
@@ -700,6 +1114,25 @@ async fn get_operations<'a>(
}
Ok(Box::new(empty()))
}
ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
comment: Some(String::from("create pgaudit extensions")),
}))),
ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
comment: Some(String::from("create pgauditlogtofile extensions")),
}))),
// Disable pgaudit logging for postgres database.
// Postgres is neon system database used by monitors
// and compute_ctl tuning functions and thus generates a lot of noise.
// We do not consider data stored in this database as sensitive.
ApplySpecPhase::DisablePostgresDBPgAudit => {
let query = "ALTER DATABASE postgres SET pgaudit.log to 'none'";
Ok(Box::new(once(Operation {
query: query.to_string(),
comment: Some(query.to_string()),
})))
}
ApplySpecPhase::HandleNeonExtension => {
let operations = vec![
Operation {

View File

@@ -25,7 +25,7 @@ use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno;
use nix::fcntl::{FcntlArg, FdFlag};
use nix::sys::signal::{kill, Signal};
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use utils::pid_file::{self, PidFileRead};

View File

@@ -5,7 +5,16 @@
//! easier to work with locally. The python tests in `test_runner`
//! rely on `neon_local` to set up the environment for each test.
//!
use anyhow::{anyhow, bail, Context, Result};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
@@ -19,7 +28,7 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{flock, FlockArg};
use nix::fcntl::{FlockArg, flock};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -31,27 +40,18 @@ use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInf
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use tokio::task::JoinSet;
use url::Host;
use utils::{
auth::{Claims, Scope},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
project_git_version,
};
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::project_git_version;
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
@@ -597,7 +597,15 @@ struct EndpointStartCmdArgs {
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
#[clap(long)]
#[clap(
long,
help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
)]
safekeepers_generation: Option<u32>,
#[clap(
long,
help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override."
)]
safekeepers: Option<String>,
#[clap(
@@ -618,9 +626,9 @@ struct EndpointStartCmdArgs {
)]
allow_multiple: bool,
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
#[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
#[arg(default_value = "90s")]
start_timeout: Duration,
}
#[derive(clap::Args)]
@@ -921,7 +929,9 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
// User (likely the Python test suite) provided a description of the environment.
if args.num_pageservers.is_some() {
bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
bail!(
"Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
);
}
// load and parse the file
let contents = std::fs::read_to_string(config_path).with_context(|| {
@@ -1315,10 +1325,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
match (mode, args.hot_standby) {
(ComputeMode::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
bail!(
"Cannot start a node in hot standby mode when it is already configured as a static replica"
)
}
(ComputeMode::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
bail!(
"Cannot start a node as a hot standby replica, it is already configured as primary node"
)
}
_ => {}
}
@@ -1345,6 +1359,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let pageserver_id = args.endpoint_pageserver_id;
let remote_ext_config = &args.remote_ext_config;
let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
@@ -1420,11 +1435,13 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
endpoint
.start(
&auth_token,
safekeepers_generation,
safekeepers,
pageservers,
remote_ext_config.as_ref(),
stripe_size.0 as usize,
args.create_test_user,
args.start_timeout,
)
.await?;
}

View File

@@ -8,7 +8,6 @@
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env};

View File

@@ -37,29 +37,24 @@
//! ```
//!
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::Database;
use compute_api::spec::PgIdent;
use compute_api::spec::RemoteExtSpec;
use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
};
use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Host;
@@ -69,9 +64,6 @@ use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
@@ -237,7 +229,9 @@ impl ComputeControlPlane {
});
if let Some((key, _)) = duplicates.next() {
bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
bail!(
"attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
);
}
}
Ok(())
@@ -584,14 +578,17 @@ impl Endpoint {
Ok(safekeeper_connstrings)
}
#[allow(clippy::too_many_arguments)]
pub async fn start(
&self,
auth_token: &Option<String>,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
remote_ext_config: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
start_timeout: Duration,
) -> Result<()> {
if self.status() == EndpointStatus::Running {
anyhow::bail!("The endpoint is already running");
@@ -663,6 +660,7 @@ impl Endpoint {
timeline_id: Some(self.timeline_id),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions,
@@ -671,6 +669,7 @@ impl Endpoint {
local_proxy_config: None,
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
};
// this strange code is needed to support respec() in tests
@@ -778,17 +777,18 @@ impl Endpoint {
std::fs::write(pidfile_path, pid.to_string())?;
// Wait for it to start
let mut attempt = 0;
const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
const MAX_ATTEMPTS: u32 = 10 * 90; // Wait up to 1.5 min
let start_at = Instant::now();
loop {
attempt += 1;
match self.get_status().await {
Ok(state) => {
match state.status {
ComputeStatus::Init => {
if attempt == MAX_ATTEMPTS {
bail!("compute startup timed out; still in Init state");
if Instant::now().duration_since(start_at) > start_timeout {
bail!(
"compute startup timed out {:?}; still in Init state",
start_timeout
);
}
// keep retrying
}
@@ -815,8 +815,11 @@ impl Endpoint {
}
}
Err(e) => {
if attempt == MAX_ATTEMPTS {
return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
if Instant::now().duration_since(start_at) > start_timeout {
return Err(e).context(format!(
"timed out {:?} waiting to connect to compute_ctl HTTP",
start_timeout,
));
}
}
}

View File

@@ -3,28 +3,22 @@
//! Now it also provides init method which acts like a stub for proper installation
//! script which will use local paths.
use anyhow::{bail, Context};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use std::{env, fs};
use anyhow::{Context, bail};
use clap::ValueEnum;
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
};
use utils::auth::{Claims, encode_from_key_file};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::pageserver::PageServerNode;
use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR;
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 16;
@@ -171,6 +165,8 @@ pub struct NeonStorageControllerConf {
#[serde(with = "humantime_serde")]
pub long_reconcile_threshold: Option<Duration>,
pub load_safekeepers: bool,
}
impl NeonStorageControllerConf {
@@ -194,6 +190,7 @@ impl Default for NeonStorageControllerConf {
max_secondary_lag_bytes: None,
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
load_safekeepers: true,
}
}
}
@@ -465,7 +462,9 @@ impl LocalEnv {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
bail!(
"branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"
);
}
} else {
existing_values.push((tenant_id, timeline_id));

View File

@@ -7,7 +7,6 @@
//! ```
//!
use std::collections::HashMap;
use std::io;
use std::io::Write;
use std::num::NonZeroU64;
@@ -15,22 +14,19 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use camino::Utf8PathBuf;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use postgres_connection::{PgConnectionConfig, parse_host_port};
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::local_env::{NeonLocalInitPageserverConf, PageServerConf};
use crate::{background_process, local_env::LocalEnv};
use crate::background_process;
use crate::local_env::{LocalEnv, NeonLocalInitPageserverConf, PageServerConf};
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver";
@@ -81,7 +77,11 @@ impl PageServerNode {
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::DocumentMut> {
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
assert_eq!(
&PageServerConf::from(&conf),
&self.conf,
"during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully"
);
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)

View File

@@ -1,3 +1,6 @@
use std::collections::HashMap;
use std::fmt;
///
/// Module for parsing postgresql.conf file.
///
@@ -6,8 +9,6 @@
/// funny stuff like include-directives or funny escaping.
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::HashMap;
use std::fmt;
/// In-memory representation of a postgresql.conf file
#[derive(Default, Debug)]

View File

@@ -14,18 +14,15 @@ use std::{io, result};
use anyhow::Context;
use camino::Utf8PathBuf;
use http_utils::error::HttpErrorBody;
use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use http_utils::error::HttpErrorBody;
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
use crate::{
background_process,
local_env::{LocalEnv, SafekeeperConf},
};
use crate::background_process;
use crate::local_env::{LocalEnv, SafekeeperConf};
#[derive(Error, Debug)]
pub enum SafekeeperHttpError {

View File

@@ -1,44 +1,39 @@
use crate::{
background_process,
local_env::{LocalEnv, NeonStorageControllerConf},
};
use std::ffi::OsStr;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::str::FromStr;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
},
shard::{ShardStripeSize, TenantShardId},
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
};
use pageserver_api::models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
ffi::OsStr,
fs,
net::SocketAddr,
path::PathBuf,
process::ExitStatus,
str::FromStr,
sync::OnceLock,
time::{Duration, Instant},
};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tracing::instrument;
use url::Url;
use utils::{
auth::{encode_from_key_file, Claims, Scope},
id::{NodeId, TenantId},
};
use utils::auth::{Claims, Scope, encode_from_key_file};
use utils::id::{NodeId, TenantId};
use whoami::username;
use crate::background_process;
use crate::local_env::{LocalEnv, NeonStorageControllerConf};
pub struct StorageController {
env: LocalEnv,
private_key: Option<Vec<u8>>,
@@ -96,7 +91,8 @@ pub struct AttachHookRequest {
#[derive(Serialize, Deserialize)]
pub struct AttachHookResponse {
pub gen: Option<u32>,
#[serde(rename = "gen")]
pub generation: Option<u32>,
}
#[derive(Serialize, Deserialize)]
@@ -541,6 +537,10 @@ impl StorageController {
args.push("--start-as-candidate".to_string());
}
if self.config.load_safekeepers {
args.push("--load-safekeepers".to_string());
}
if let Some(private_key) = &self.private_key {
let claims = Claims::new(None, Scope::PageServerApi);
let jwt_token =
@@ -779,7 +779,7 @@ impl StorageController {
)
.await?;
Ok(response.gen)
Ok(response.generation)
}
#[instrument(skip(self))]

View File

@@ -1,34 +1,27 @@
use futures::StreamExt;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
time::Duration,
};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::Duration;
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest,
TenantShardSplitRequest, TenantShardSplitResponse,
},
shard::{ShardStripeSize, TenantShardId},
use futures::StreamExt;
use pageserver_api::controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeConfigureRequest, NodeDescribeResponse,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, PlacementPolicy,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters,
TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
TenantShardSplitResponse,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use utils::id::{NodeId, TenantId, TimelineId};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
#[derive(Subcommand, Debug)]
enum Command {
@@ -921,7 +914,9 @@ async fn main() -> anyhow::Result<()> {
}
Command::TenantDrop { tenant_id, unclean } => {
if !unclean {
anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
anyhow::bail!(
"This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed."
)
}
storcon_client
.dispatch::<(), ()>(
@@ -933,7 +928,9 @@ async fn main() -> anyhow::Result<()> {
}
Command::NodeDrop { node_id, unclean } => {
if !unclean {
anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
anyhow::bail!(
"This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed."
)
}
storcon_client
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)

View File

@@ -7,7 +7,7 @@ index f255fe6..0a0fa65 100644
GENERATED_SCHEDULE_DEPS = $(TB_DIR)/all_tests $(TB_DIR)/exclude_tests
REGRESS = --schedule $(TB_DIR)/run.sch # Set this again just to be safe
-REGRESS_OPTS = --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=pgtap_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=contrib_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
SETUP_SCH = test/schedule/main.sch # schedule to use for test setup; this can be forcibly changed by some targets!
IGNORE_TESTS = $(notdir $(EXCLUDE_TEST_FILES:.sql=))
PARALLEL_TESTS = $(filter-out $(IGNORE_TESTS),$(filter-out $(SERIAL_TESTS),$(ALL_TESTS)))

View File

@@ -6,12 +6,16 @@ generate_id() {
local -n resvar=$1
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
}
if [ -z ${OLD_COMPUTE_TAG+x} ] || [ -z ${NEW_COMPUTE_TAG+x} ] || [ -z "${OLD_COMPUTE_TAG}" ] || [ -z "${NEW_COMPUTE_TAG}" ]; then
echo OLD_COMPUTE_TAG and NEW_COMPUTE_TAG must be defined
echo "${OLD_COMPUTE_TAG}"
echo "${NEW_COMPUTE_TAG}"
echo "${TEST_EXTENSIONS_TAG}"
if [ -z "${OLD_COMPUTE_TAG:-}" ] || [ -z "${NEW_COMPUTE_TAG:-}" ] || [ -z "${TEST_EXTENSIONS_TAG:-}" ]; then
echo OLD_COMPUTE_TAG, NEW_COMPUTE_TAG and TEST_EXTENSIONS_TAG must be set
exit 1
fi
export PG_VERSION=${PG_VERSION:-16}
export PG_TEST_VERSION=${PG_VERSION}
# Waits for compute node is ready
function wait_for_ready {
TIME=0
while ! docker compose logs compute_is_ready | grep -q "accepting connections" && [ ${TIME} -le 300 ] ; do
@@ -23,11 +27,45 @@ function wait_for_ready {
exit 2
fi
}
# Creates extensions. Gets a string with space-separated extensions as a parameter
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
done
}
# Creates a new timeline. Gets the parent ID and an extension name as parameters.
# Saves the timeline ID in the variable EXT_TIMELINE
function create_timeline() {
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${1}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
EXT_TIMELINE[${2}]=${new_timeline_id}
}
# Checks if the timeline ID of the compute node is expected. Gets the timeline ID as a parameter
function check_timeline() {
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ "${TID}" != "${1}" ]; then
echo Timeline mismatch
exit 1
fi
}
# Restarts the compute node with the required compute tag and timeline.
# Accepts the tag for the compute node and the timeline as parameters.
function restart_compute() {
docker compose down compute compute_is_ready
COMPUTE_TAG=${1} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
check_timeline ${2}
}
declare -A EXT_TIMELINE
EXTENSIONS='[
{"extname": "plv8", "extdir": "plv8-src"},
{"extname": "vector", "extdir": "pgvector-src"},
@@ -59,8 +97,10 @@ COMPUTE_TAG=${OLD_COMPUTE_TAG} docker compose --profile test-extensions up --qui
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE pgtap_regression"
docker compose exec neon-test-extensions psql -d pgtap_regression -c "CREATE EXTENSION pgtap"
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
EXT_TIMELINE["main"]=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
create_timeline "${EXT_TIMELINE["main"]}" init
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE["init"]}"
create_extensions "${EXTNAMES}"
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
exts="${EXTNAMES}"
@@ -71,29 +111,13 @@ fi
if [ -z "${exts}" ]; then
echo "No extensions were upgraded"
else
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
timeline_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
for ext in ${exts}; do
echo Testing ${ext}...
create_timeline "${EXT_TIMELINE["main"]}" ${ext}
EXTDIR=$(echo ${EXTENSIONS} | jq -r '.[] | select(.extname=="'${ext}'") | .extdir')
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${timeline_id}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} COMPUTE_TAG=${OLD_COMPUTE_TAG} docker compose down compute compute_is_ready
COMPUTE_TAG=${NEW_COMPUTE_TAG} TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ ${TID} != ${new_timeline_id} ]; then
echo Timeline mismatch
exit 1
fi
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
docker compose exec neon-test-extensions psql -d contrib_regression -c "CREATE EXTENSION ${ext} CASCADE"
restart_compute "${NEW_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
if ! docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh; then
docker compose exec neon-test-extensions cat /ext-src/${EXTDIR}/regression.diffs

View File

@@ -134,8 +134,10 @@ pub struct CatalogObjects {
pub databases: Vec<Database>,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ComputeCtlConfig {
/// Set of JSON web keys that the compute can use to authenticate
/// communication from the control plane.
pub jwks: JwkSet,
}

View File

@@ -101,6 +101,17 @@ pub struct ComputeSpec {
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
/// Safekeeper membership config generation. It is put in
/// neon.safekeepers GUC and serves two purposes:
/// 1) Non zero value forces walproposer to use membership configurations.
/// 2) If walproposer wants to update list of safekeepers to connect to
/// taking them from some safekeeper mconf, it should check what value
/// is newer by comparing the generation.
///
/// Note: it could be SafekeeperGeneration, but this needs linking
/// compute_ctl with postgres_ffi.
#[serde(default)]
pub safekeepers_generation: Option<u32>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,
@@ -144,6 +155,16 @@ pub struct ComputeSpec {
/// over the same replication content from publisher.
#[serde(default)] // Default false
pub drop_subscriptions_before_start: bool,
/// Log level for audit logging:
///
/// Disabled - no audit logging. This is the default.
/// log - log masked statements to the postgres log using pgaudit extension
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
///
/// Extensions should be present in shared_preload_libraries
#[serde(default)]
pub audit_log_level: ComputeAudit,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -251,6 +272,17 @@ pub enum ComputeMode {
Replica,
}
/// Log level for audit logging
/// Disabled, log, hipaa
/// Default is Disabled
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeAudit {
#[default]
Disabled,
Log,
Hipaa,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct Cluster {
pub cluster_id: Option<String>,

View File

@@ -1,7 +1,7 @@
[package]
name = "consumption_metrics"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "Apache-2.0"
[dependencies]

View File

@@ -1,4 +1,5 @@
use std::{collections::VecDeque, sync::Arc};
use std::collections::VecDeque;
use std::sync::Arc;
use parking_lot::{Mutex, MutexGuard};

View File

@@ -1,11 +1,7 @@
use std::{
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
mpsc, Arc, OnceLock,
},
thread::JoinHandle,
};
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering};
use std::sync::{Arc, OnceLock, mpsc};
use std::thread::JoinHandle;
use tracing::{debug, error, trace};

View File

@@ -1,26 +1,19 @@
use std::{
cmp::Ordering,
collections::{BinaryHeap, VecDeque},
fmt::{self, Debug},
ops::DerefMut,
sync::{mpsc, Arc},
};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::fmt::{self, Debug};
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use parking_lot::{
lock_api::{MappedMutexGuard, MutexGuard},
Mutex, RawMutex,
};
use parking_lot::lock_api::{MappedMutexGuard, MutexGuard};
use parking_lot::{Mutex, RawMutex};
use rand::rngs::StdRng;
use tracing::debug;
use crate::{
executor::{self, ThreadContext},
options::NetworkOptions,
proto::NetEvent,
proto::NodeEvent,
};
use super::{chan::Chan, proto::AnyMessage};
use super::chan::Chan;
use super::proto::AnyMessage;
use crate::executor::{self, ThreadContext};
use crate::options::NetworkOptions;
use crate::proto::{NetEvent, NodeEvent};
pub struct NetworkTask {
options: Arc<NetworkOptions>,

View File

@@ -2,14 +2,11 @@ use std::sync::Arc;
use rand::Rng;
use super::chan::Chan;
use super::network::TCP;
use super::world::{Node, NodeId, World};
use crate::proto::NodeEvent;
use super::{
chan::Chan,
network::TCP,
world::{Node, NodeId, World},
};
/// Abstraction with all functions (aka syscalls) available to the node.
#[derive(Clone)]
pub struct NodeOs {

View File

@@ -1,4 +1,5 @@
use rand::{rngs::StdRng, Rng};
use rand::Rng;
use rand::rngs::StdRng;
/// Describes random delays and failures. Delay will be uniformly distributed in [min, max].
/// Connection failure will occur with the probablity fail_prob.

View File

@@ -3,7 +3,8 @@ use std::fmt::Debug;
use bytes::Bytes;
use utils::lsn::Lsn;
use crate::{network::TCP, world::NodeId};
use crate::network::TCP;
use crate::world::NodeId;
/// Internal node events.
#[derive(Debug)]

View File

@@ -1,12 +1,8 @@
use std::{
cmp::Ordering,
collections::BinaryHeap,
ops::DerefMut,
sync::{
atomic::{AtomicU32, AtomicU64},
Arc,
},
};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
use parking_lot::Mutex;
use tracing::trace;

View File

@@ -1,19 +1,18 @@
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use parking_lot::Mutex;
use rand::{rngs::StdRng, SeedableRng};
use std::{
ops::DerefMut,
sync::{mpsc, Arc},
};
use rand::SeedableRng;
use rand::rngs::StdRng;
use crate::{
executor::{ExternalHandle, Runtime},
network::NetworkTask,
options::NetworkOptions,
proto::{NodeEvent, SimEvent},
time::Timing,
};
use super::{chan::Chan, network::TCP, node_os::NodeOs};
use super::chan::Chan;
use super::network::TCP;
use super::node_os::NodeOs;
use crate::executor::{ExternalHandle, Runtime};
use crate::network::NetworkTask;
use crate::options::NetworkOptions;
use crate::proto::{NodeEvent, SimEvent};
use crate::time::Timing;
pub type NodeId = u32;

View File

@@ -1,14 +1,15 @@
//! Simple test to verify that simulator is working.
#[cfg(test)]
mod reliable_copy_test {
use std::sync::Arc;
use anyhow::Result;
use desim::executor::{self, PollSome};
use desim::node_os::NodeOs;
use desim::options::{Delay, NetworkOptions};
use desim::proto::{NetEvent, NodeEvent, ReplCell};
use desim::proto::{AnyMessage, NetEvent, NodeEvent, ReplCell};
use desim::world::{NodeId, World};
use desim::{node_os::NodeOs, proto::AnyMessage};
use parking_lot::Mutex;
use std::sync::Arc;
use tracing::info;
/// Disk storage trait and implementation.

View File

@@ -6,11 +6,8 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
backtrace.workspace = true
bytes.workspace = true
inferno.workspace = true
fail.workspace = true
flate2.workspace = true
hyper0.workspace = true
itertools.workspace = true
jemalloc_pprof.workspace = true

View File

@@ -1,30 +1,28 @@
use crate::error::{api_error_handler, route_error_handler, ApiError};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
use ::pprof::protos::Message as _;
use ::pprof::ProfilerGuardBuilder;
use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
use hyper::http::HeaderValue;
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use regex::Regex;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{mpsc, Mutex, Notify};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{debug, info, info_span, warn, Instrument};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{Context, anyhow};
use bytes::{Bytes, BytesMut};
use hyper::header::{AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_TYPE, HeaderName};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response};
use metrics::{Encoder, IntCounter, TextEncoder, register_int_counter};
use once_cell::sync::Lazy;
use pprof::ProfilerGuardBuilder;
use pprof::protos::Message as _;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{Mutex, Notify, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{Instrument, debug, info, info_span, warn};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::error::{ApiError, api_error_handler, route_error_handler};
use crate::request::{get_query_param, parse_query_param};
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
@@ -375,7 +373,7 @@ pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, A
Err(_) => {
return Err(ApiError::Conflict(
"profiler already running (use ?force=true to cancel it)".into(),
))
));
}
}
tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait
@@ -449,20 +447,6 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
};
// Functions and mappings to strip when symbolizing pprof profiles. If true,
// also remove child frames.
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
vec![
(Regex::new("^__rust").unwrap(), false),
(Regex::new("^_start$").unwrap(), false),
(Regex::new("^irallocx_prof").unwrap(), true),
(Regex::new("^prof_alloc_prep").unwrap(), true),
(Regex::new("^std::rt::lang_start").unwrap(), false),
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
]
});
const STRIP_MAPPINGS: &[&str] = &["libc", "libgcc", "pthread", "vdso"];
// Obtain profiler handle.
let mut prof_ctl = jemalloc_pprof::PROF_CTL
.as_ref()
@@ -495,52 +479,34 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
}
Format::Pprof => {
let data = tokio::task::spawn_blocking(move || {
let bytes = prof_ctl.dump_pprof()?;
// Symbolize the profile.
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
// serialization roundtrip.
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
pprof::encode(&profile)
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
let data = tokio::task::spawn_blocking(move || prof_ctl.dump_pprof())
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "application/octet-stream")
.header(CONTENT_DISPOSITION, "attachment; filename=\"heap.pb\"")
.header(CONTENT_DISPOSITION, "attachment; filename=\"heap.pb.gz\"")
.body(Body::from(data))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
Format::Svg => {
let body = tokio::task::spawn_blocking(move || {
let bytes = prof_ctl.dump_pprof()?;
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
let mut opts = inferno::flamegraph::Options::default();
opts.title = "Heap inuse".to_string();
opts.count_name = "bytes".to_string();
pprof::flamegraph(profile, &mut opts)
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
let svg = tokio::task::spawn_blocking(move || prof_ctl.dump_flamegraph())
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "image/svg+xml")
.body(Body::from(body))
.body(Body::from(svg))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
}
}
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
-> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) {
Some(request_id) => request_id
@@ -664,7 +630,7 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
None => {
return Err(ApiError::Unauthorized(
"missing authorization header".to_string(),
))
));
}
}
}
@@ -717,12 +683,14 @@ pub fn check_permission_with(
#[cfg(test)]
mod tests {
use super::*;
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use std::future::poll_fn;
use std::net::{IpAddr, SocketAddr};
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use super::*;
#[tokio::test]
async fn test_request_id_returned() {
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();

View File

@@ -1,10 +1,10 @@
use hyper::{header, Body, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error as StdError;
use hyper::{Body, Response, StatusCode, header};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{error, info, warn};
use utils::auth::AuthError;
#[derive(Debug, Error)]

View File

@@ -1,12 +1,11 @@
use crate::error::ApiError;
use crate::json::{json_request, json_response};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
use crate::error::ApiError;
use crate::json::{json_request, json_response};
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point

View File

@@ -1,6 +1,6 @@
use anyhow::Context;
use bytes::Buf;
use hyper::{header, Body, Request, Response, StatusCode};
use hyper::{Body, Request, Response, StatusCode, header};
use serde::{Deserialize, Serialize};
use super::error::ApiError;

View File

@@ -2,11 +2,10 @@ pub mod endpoint;
pub mod error;
pub mod failpoints;
pub mod json;
pub mod pprof;
pub mod request;
extern crate hyper0 as hyper;
/// Current fast way to apply simple http routing in various Neon binaries.
/// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed.
pub use routerify::{ext::RequestExt, RouterBuilder, RouterService};
pub use routerify::{RouterBuilder, RouterService, ext::RequestExt};

View File

@@ -1,238 +0,0 @@
use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::ffi::c_void;
use std::io::Write as _;
/// Decodes a gzip-compressed Protobuf-encoded pprof profile.
pub fn decode(bytes: &[u8]) -> anyhow::Result<Profile> {
let mut gz = GzDecoder::new(Vec::new());
gz.write_all(bytes)?;
Ok(Profile::parse_from_bytes(&gz.finish()?)?)
}
/// Encodes a pprof profile as gzip-compressed Protobuf.
pub fn encode(profile: &Profile) -> anyhow::Result<Vec<u8>> {
let mut gz = GzEncoder::new(Vec::new(), Compression::default());
profile.write_to_writer(&mut gz)?;
Ok(gz.finish()?)
}
/// Symbolizes a pprof profile using the current binary.
pub fn symbolize(mut profile: Profile) -> anyhow::Result<Profile> {
if !profile.function.is_empty() {
return Ok(profile); // already symbolized
}
// Collect function names.
let mut functions: HashMap<String, Function> = HashMap::new();
let mut strings: HashMap<String, i64> = profile
.string_table
.into_iter()
.enumerate()
.map(|(i, s)| (s, i as i64))
.collect();
// Helper to look up or register a string.
let mut string_id = |s: &str| -> i64 {
// Don't use .entry() to avoid unnecessary allocations.
if let Some(id) = strings.get(s) {
return *id;
}
let id = strings.len() as i64;
strings.insert(s.to_string(), id);
id
};
for loc in &mut profile.location {
if !loc.line.is_empty() {
continue;
}
// Resolve the line and function for each location.
backtrace::resolve(loc.address as *mut c_void, |symbol| {
let Some(symbol_name) = symbol.name() else {
return;
};
let function_name = format!("{symbol_name:#}");
let functions_len = functions.len();
let function_id = functions
.entry(function_name)
.or_insert_with_key(|function_name| {
let function_id = functions_len as u64 + 1;
let system_name = String::from_utf8_lossy(symbol_name.as_bytes());
let filename = symbol
.filename()
.map(|path| path.to_string_lossy())
.unwrap_or(Cow::Borrowed(""));
Function {
id: function_id,
name: string_id(function_name),
system_name: string_id(&system_name),
filename: string_id(&filename),
..Default::default()
}
})
.id;
loc.line.push(Line {
function_id,
line: symbol.lineno().unwrap_or(0) as i64,
..Default::default()
});
});
}
// Store the resolved functions, and mark the mapping as resolved.
profile.function = functions.into_values().sorted_by_key(|f| f.id).collect();
profile.string_table = strings
.into_iter()
.sorted_by_key(|(_, i)| *i)
.map(|(s, _)| s)
.collect();
for mapping in &mut profile.mapping {
mapping.has_functions = true;
mapping.has_filenames = true;
}
Ok(profile)
}
/// Strips locations (stack frames) matching the given mappings (substring) or function names
/// (regex). The function bool specifies whether child frames should be stripped as well.
///
/// The string definitions are left behind in the profile for simplicity, to avoid rewriting all
/// string references.
pub fn strip_locations(
mut profile: Profile,
mappings: &[&str],
functions: &[(Regex, bool)],
) -> Profile {
// Strip mappings.
let mut strip_mappings: HashSet<u64> = HashSet::new();
profile.mapping.retain(|mapping| {
let Some(name) = profile.string_table.get(mapping.filename as usize) else {
return true;
};
if mappings.iter().any(|substr| name.contains(substr)) {
strip_mappings.insert(mapping.id);
return false;
}
true
});
// Strip functions.
let mut strip_functions: HashMap<u64, bool> = HashMap::new();
profile.function.retain(|function| {
let Some(name) = profile.string_table.get(function.name as usize) else {
return true;
};
for (regex, strip_children) in functions {
if regex.is_match(name) {
strip_functions.insert(function.id, *strip_children);
return false;
}
}
true
});
// Strip locations. The bool specifies whether child frames should be stripped too.
let mut strip_locations: HashMap<u64, bool> = HashMap::new();
profile.location.retain(|location| {
for line in &location.line {
if let Some(strip_children) = strip_functions.get(&line.function_id) {
strip_locations.insert(location.id, *strip_children);
return false;
}
}
if strip_mappings.contains(&location.mapping_id) {
strip_locations.insert(location.id, false);
return false;
}
true
});
// Strip sample locations.
for sample in &mut profile.sample {
// First, find the uppermost function with child removal and truncate the stack.
if let Some(truncate) = sample
.location_id
.iter()
.rposition(|id| strip_locations.get(id) == Some(&true))
{
sample.location_id.drain(..=truncate);
}
// Next, strip any individual frames without child removal.
sample
.location_id
.retain(|id| !strip_locations.contains_key(id));
}
profile
}
/// Generates an SVG flamegraph from a symbolized pprof profile.
pub fn flamegraph(
profile: Profile,
opts: &mut inferno::flamegraph::Options,
) -> anyhow::Result<Vec<u8>> {
if profile.mapping.iter().any(|m| !m.has_functions) {
bail!("profile not symbolized");
}
// Index locations, functions, and strings.
let locations: HashMap<u64, Location> =
profile.location.into_iter().map(|l| (l.id, l)).collect();
let functions: HashMap<u64, Function> =
profile.function.into_iter().map(|f| (f.id, f)).collect();
let strings = profile.string_table;
// Resolve stacks as function names, and sum sample values per stack. Also reverse the stack,
// since inferno expects it bottom-up.
let mut stacks: HashMap<Vec<&str>, i64> = HashMap::new();
for sample in profile.sample {
let mut stack = Vec::with_capacity(sample.location_id.len());
for location in sample.location_id.into_iter().rev() {
let Some(location) = locations.get(&location) else {
bail!("missing location {location}");
};
for line in location.line.iter().rev() {
let Some(function) = functions.get(&line.function_id) else {
bail!("missing function {}", line.function_id);
};
let Some(name) = strings.get(function.name as usize) else {
bail!("missing string {}", function.name);
};
stack.push(name.as_str());
}
}
let Some(&value) = sample.value.first() else {
bail!("missing value");
};
*stacks.entry(stack).or_default() += value;
}
// Construct stack lines for inferno.
let lines = stacks
.into_iter()
.map(|(stack, value)| (stack.into_iter().join(";"), value))
.map(|(stack, value)| format!("{stack} {value}"))
.sorted()
.collect_vec();
// Construct the flamegraph.
let mut bytes = Vec::new();
let lines = lines.iter().map(|line| line.as_str());
inferno::flamegraph::from_lines(opts, lines, &mut bytes)?;
Ok(bytes)
}

View File

@@ -1,10 +1,13 @@
use core::fmt;
use std::{borrow::Cow, str::FromStr};
use std::borrow::Cow;
use std::str::FromStr;
use anyhow::anyhow;
use hyper::body::HttpBody;
use hyper::{Body, Request};
use routerify::ext::RequestExt;
use super::error::ApiError;
use anyhow::anyhow;
use hyper::{body::HttpBody, Body, Request};
use routerify::ext::RequestExt;
pub fn get_request_param<'a>(
request: &'a Request<Body>,

View File

@@ -6,17 +6,15 @@
//! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
//! use significantly less memory than this, but can only approximate the cardinality.
use std::{
hash::{BuildHasher, BuildHasherDefault, Hash},
sync::atomic::AtomicU8,
};
use std::hash::{BuildHasher, BuildHasherDefault, Hash};
use std::sync::atomic::AtomicU8;
use measured::{
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
text::TextEncoder,
LabelGroup,
};
use measured::LabelGroup;
use measured::label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor};
use measured::metric::counter::CounterState;
use measured::metric::name::MetricNameEncoder;
use measured::metric::{Metric, MetricType, MetricVec};
use measured::text::TextEncoder;
use twox_hash::xxh3;
/// Create an [`HyperLogLogVec`] and registers to default registry.
@@ -27,9 +25,7 @@ macro_rules! register_hll_vec {
$crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
}};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
$crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
}};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{ $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES) }};
}
/// Create an [`HyperLogLog`] and registers to default registry.
@@ -40,9 +36,7 @@ macro_rules! register_hll {
$crate::register(Box::new(hll.clone())).map(|_| hll)
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
$crate::register_hll!($N, $crate::opts!($NAME, $HELP))
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{ $crate::register_hll!($N, $crate::opts!($NAME, $HELP)) }};
}
/// HLL is a probabilistic cardinality measure.
@@ -195,8 +189,10 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
mod tests {
use std::collections::HashSet;
use measured::{label::StaticLabelSet, FixedCardinalityLabel};
use rand::{rngs::StdRng, Rng, SeedableRng};
use measured::FixedCardinalityLabel;
use measured::label::StaticLabelSet;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand_distr::{Distribution, Zipf};
use crate::HyperLogLogVec;

View File

@@ -1,9 +1,10 @@
//! A timestamp captured at process startup to identify restarts of the process, e.g., in logs and metrics.
use std::fmt::Display;
use chrono::Utc;
use super::register_uint_gauge;
use std::fmt::Display;
pub struct LaunchTimestamp(chrono::DateTime<Utc>);

View File

@@ -4,38 +4,26 @@
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use measured::{
label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels},
metric::{
counter::CounterState,
gauge::GaugeState,
group::Encoding,
name::{MetricName, MetricNameEncoder},
MetricEncoding, MetricFamilyEncoding,
},
FixedCardinalityLabel, LabelGroup, MetricGroup,
};
use measured::label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels};
use measured::metric::counter::CounterState;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use measured::metric::name::{MetricName, MetricNameEncoder};
use measured::metric::{MetricEncoding, MetricFamilyEncoding};
use measured::{FixedCardinalityLabel, LabelGroup, MetricGroup};
use once_cell::sync::Lazy;
use prometheus::Registry;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
pub use prometheus::local::LocalHistogram;
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
use prometheus::Registry;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_counter_vec, Counter, CounterVec};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};
pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
pub use prometheus::{
Counter, CounterVec, Encoder, Error, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, TextEncoder, core, default_registry, exponential_buckets,
linear_buckets, opts, proto, register, register_counter_vec, register_gauge,
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
};
pub mod launch_timestamp;
mod wrappers;

View File

@@ -1165,6 +1165,21 @@ pub struct OffloadedTimelineInfo {
pub archived_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum RelSizeMigration {
/// The tenant is using the old rel_size format.
/// Note that this enum is persisted as `Option<RelSizeMigration>` in the index part, so
/// `None` is the same as `Some(RelSizeMigration::Legacy)`.
Legacy,
/// The tenant is migrating to the new rel_size format. Both old and new rel_size format are
/// persisted in the index part. The read path will read both formats and merge them.
Migrating,
/// The tenant has migrated to the new rel_size format. Only the new rel_size format is persisted
/// in the index part, and the read path will not read the old format.
Migrated,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
@@ -1243,7 +1258,11 @@ pub struct TimelineInfo {
// Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
// not deny unknown fields by default so it's safe to set the field to some value, though it won't be
// read.
/// Whether the timeline is archived.
pub is_archived: Option<bool>,
/// The status of the rel_size migration.
pub rel_size_migration: Option<RelSizeMigration>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -4,28 +4,28 @@
//! is rather narrow, but we can extend it once required.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::Context;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::task::{Poll, ready};
use std::{fmt, io};
use std::{future::Future, str::FromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use anyhow::Context;
use bytes::Bytes;
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
use pq_proto::{
BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_ADMIN_SHUTDOWN,
SQLSTATE_INTERNAL_ERROR, SQLSTATE_SUCCESSFUL_COMPLETION,
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
/// An error, occurred during query processing:
/// either during the connection ([`ConnectionError`]) or before/after it.
@@ -746,7 +746,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
match e {
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
QueryError::SimulatedConnectionError => {
return Err(QueryError::SimulatedConnectionError)
return Err(QueryError::SimulatedConnectionError);
}
err @ QueryError::Reconnect => {
// Instruct the client to reconnect, stop processing messages
@@ -1020,7 +1020,9 @@ fn log_query_error(query: &str, e: &QueryError) {
}
}
QueryError::Disconnected(other_connection_error) => {
error!("query handler for '{query}' failed with connection error: {other_connection_error:?}")
error!(
"query handler for '{query}' failed with connection error: {other_connection_error:?}"
)
}
QueryError::SimulatedConnectionError => {
error!("query handler for query '{query}' failed due to a simulated connection error")

View File

@@ -1,10 +1,11 @@
use std::io::Cursor;
use std::sync::Arc;
/// Test postgres_backend_async with tokio_postgres
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
use pq_proto::{BeMessage, RowDescriptor};
use rustls::crypto::ring;
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_postgres::config::SslMode;

View File

@@ -1,9 +1,10 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;
use std::fmt;
use anyhow::{Context, bail};
use itertools::Itertools;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
@@ -29,9 +30,10 @@ pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>
#[cfg(test)]
mod tests_parse_host_port {
use crate::parse_host_port;
use url::Host;
use crate::parse_host_port;
#[test]
fn test_normal() {
let (host, port) = parse_host_port("hello:123").unwrap();
@@ -207,10 +209,11 @@ impl fmt::Debug for PgConnectionConfig {
#[cfg(test)]
mod tests_pg_connection_config {
use crate::PgConnectionConfig;
use once_cell::sync::Lazy;
use url::Host;
use crate::PgConnectionConfig;
static STUB_HOST: Lazy<Host> = Lazy::new(|| Host::Domain("stub.host.example".to_owned()));
#[test]

View File

@@ -1,6 +1,6 @@
use std::ffi::CStr;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use postgres_ffi::v17::wal_generator::LogicalMessageGenerator;
use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
use postgres_ffi::waldecoder::WalStreamDecoder;

View File

@@ -4,7 +4,7 @@ use std::env;
use std::path::PathBuf;
use std::process::Command;
use anyhow::{anyhow, Context};
use anyhow::{Context, anyhow};
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
#[derive(Debug)]

View File

@@ -21,7 +21,9 @@ macro_rules! postgres_ffi {
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
#![allow(unsafe_op_in_unsafe_fn)]
#![allow(clippy::undocumented_unsafe_blocks)]
#![allow(clippy::ptr_offset_with_cast)]
use serde::{Deserialize, Serialize};
include!(concat!(
@@ -43,8 +45,7 @@ macro_rules! postgres_ffi {
pub const PG_MAJORVERSION: &str = stringify!($version);
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub use bindings::{CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
@@ -221,21 +222,17 @@ pub mod relfile_utils;
pub mod walrecord;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::RepOriginId;
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
pub use v14::bindings::{MultiXactId, TransactionId};
pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
pub use v14::bindings::{
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
uint64,
};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
pub use v14::bindings::{CheckPoint, ControlFileData};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
@@ -246,13 +243,11 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::try_from_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub use v14::xlog_utils::{
XLogFileName, encode_logical_message, get_current_timestamp, to_pg_timestamp,
try_from_pg_timestamp,
};
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool {
dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
@@ -355,8 +350,9 @@ pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
}
pub mod waldecoder {
use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32;
use bytes::{Buf, Bytes, BytesMut};
use thiserror::Error;
use utils::lsn::Lsn;

View File

@@ -9,8 +9,7 @@
//! comments on them.
//!
use crate::PageHeaderData;
use crate::BLCKSZ;
use crate::{BLCKSZ, PageHeaderData};
//
// From pg_tablespace_d.h

View File

@@ -3,18 +3,16 @@
//!
//! TODO: Generate separate types for each supported PG version
use crate::pg_constants;
use crate::XLogRecord;
use crate::{
BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, TimestampTz,
TransactionId,
};
use crate::{BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD};
use bytes::{Buf, Bytes};
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId,
TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactCreate {
@@ -508,9 +506,10 @@ pub fn decode_wal_record(
}
pub mod v14 {
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapInsert {
@@ -678,9 +677,10 @@ pub mod v15 {
}
pub mod v16 {
use bytes::{Buf, Bytes};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
pub struct XlHeapDelete {
pub xmax: TransactionId,
@@ -746,9 +746,10 @@ pub mod v16 {
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
pub mod rm_neon {
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonHeapInsert {
@@ -858,14 +859,14 @@ pub mod v16 {
}
pub mod v17 {
pub use super::v14::XlHeapLockUpdated;
pub use crate::{TimeLineID, TimestampTz};
use bytes::{Buf, Bytes};
pub use super::v16::rm_neon;
pub use super::v14::XlHeapLockUpdated;
pub use super::v16::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
rm_neon,
};
pub use crate::{TimeLineID, TimestampTz};
#[repr(C)]
#[derive(Debug)]

View File

@@ -1,7 +1,9 @@
use std::path::PathBuf;
use std::str::FromStr;
use anyhow::*;
use clap::{value_parser, Arg, ArgMatches, Command};
use clap::{Arg, ArgMatches, Command, value_parser};
use postgres::Client;
use std::{path::PathBuf, str::FromStr};
use wal_craft::*;
fn main() -> Result<()> {

View File

@@ -1,17 +1,18 @@
use anyhow::{bail, ensure};
use camino_tempfile::{tempdir, Utf8TempDir};
use log::*;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
use anyhow::{bail, ensure};
use camino_tempfile::{Utf8TempDir, tempdir};
use log::*;
use postgres::Client;
use postgres::types::PgLsn;
use postgres_ffi::{
WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD,
XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
macro_rules! xlog_utils_test {
($version:ident) => {
#[path = "."]

View File

@@ -10,11 +10,10 @@
//! calls.
//!
//! [Box]: https://docs.rs/futures-util/0.3.26/src/futures_util/lock/bilock.rs.html#107
use std::future::Future;
use std::io::{self, ErrorKind};
use bytes::{Buf, BytesMut};
use std::{
future::Future,
io::{self, ErrorKind},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
use crate::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};

View File

@@ -5,14 +5,15 @@
pub mod framed;
use std::borrow::Cow;
use std::{fmt, io, str};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt, io, str};
// re-export for use in utils pageserver_feedback.rs
pub use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
pub type Oid = u32;
pub type SystemId = u64;
@@ -206,8 +207,8 @@ use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {
CancelKeyData {
backend_pid: rng.gen(),
cancel_key: rng.gen(),
backend_pid: rng.r#gen(),
cancel_key: rng.r#gen(),
}
}
}
@@ -1035,7 +1036,7 @@ impl BeMessage<'_> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
// dependency
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.commit_lsn);
buf.put_slice(rec.data);

View File

@@ -34,8 +34,13 @@ where
.make_tls_connect(hostname)
.map_err(|e| Error::tls(e.into()))?;
let socket =
connect_socket::connect_socket(&config.host, config.port, config.connect_timeout).await?;
let socket = connect_socket::connect_socket(
config.host_addr,
&config.host,
config.port,
config.connect_timeout,
)
.await?;
cancel_query_raw::cancel_query_raw(socket, ssl_mode, tls, process_id, secret_key).await
}

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fmt;
use std::net::IpAddr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
@@ -137,6 +138,7 @@ impl InnerClient {
#[derive(Clone, Serialize, Deserialize)]
pub struct SocketConfig {
pub host_addr: Option<IpAddr>,
pub host: Host,
pub port: u16,
pub connect_timeout: Option<Duration>,

View File

@@ -1,5 +1,6 @@
//! Connection configuration.
use std::net::IpAddr;
use std::time::Duration;
use std::{fmt, str};
@@ -65,6 +66,7 @@ pub enum AuthKeys {
/// Connection configuration.
#[derive(Clone, PartialEq, Eq)]
pub struct Config {
pub(crate) host_addr: Option<IpAddr>,
pub(crate) host: Host,
pub(crate) port: u16,
@@ -83,6 +85,7 @@ impl Config {
/// Creates a new configuration.
pub fn new(host: String, port: u16) -> Config {
Config {
host_addr: None,
host: Host::Tcp(host),
port,
password: None,
@@ -163,6 +166,15 @@ impl Config {
self
}
pub fn set_host_addr(&mut self, addr: IpAddr) -> &mut Config {
self.host_addr = Some(addr);
self
}
pub fn get_host_addr(&self) -> Option<IpAddr> {
self.host_addr
}
/// Sets the SSL configuration.
///
/// Defaults to `prefer`.

View File

@@ -1,3 +1,5 @@
use std::net::IpAddr;
use postgres_protocol2::message::backend::Message;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
@@ -25,13 +27,14 @@ where
.make_tls_connect(hostname)
.map_err(|e| Error::tls(e.into()))?;
match connect_once(&config.host, config.port, tls, config).await {
match connect_once(config.host_addr, &config.host, config.port, tls, config).await {
Ok((client, connection)) => Ok((client, connection)),
Err(e) => Err(e),
}
}
async fn connect_once<T>(
host_addr: Option<IpAddr>,
host: &Host,
port: u16,
tls: T,
@@ -40,7 +43,7 @@ async fn connect_once<T>(
where
T: TlsConnect<TcpStream>,
{
let socket = connect_socket(host, port, config.connect_timeout).await?;
let socket = connect_socket(host_addr, host, port, config.connect_timeout).await?;
let RawConnection {
stream,
parameters,
@@ -50,6 +53,7 @@ where
} = connect_raw(socket, tls, config).await?;
let socket_config = SocketConfig {
host_addr,
host: host.clone(),
port,
connect_timeout: config.connect_timeout,

View File

@@ -1,5 +1,6 @@
use std::future::Future;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use tokio::net::{self, TcpStream};
@@ -9,15 +10,20 @@ use crate::Error;
use crate::config::Host;
pub(crate) async fn connect_socket(
host_addr: Option<IpAddr>,
host: &Host,
port: u16,
connect_timeout: Option<Duration>,
) -> Result<TcpStream, Error> {
match host {
Host::Tcp(host) => {
let addrs = net::lookup_host((&**host, port))
.await
.map_err(Error::connect)?;
let addrs = match host_addr {
Some(addr) => vec![SocketAddr::new(addr, port)],
None => net::lookup_host((&**host, port))
.await
.map_err(Error::connect)?
.collect(),
};
let mut last_err = None;

View File

@@ -130,11 +130,7 @@ impl StorageModel {
break;
}
}
if possible {
Some(snapshot_later)
} else {
None
}
if possible { Some(snapshot_later) } else { None }
} else {
None
};

View File

@@ -76,7 +76,10 @@ pub fn draw_svg(
let mut result = String::new();
writeln!(result, "<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">")?;
writeln!(
result,
"<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">"
)?;
draw.calculate_svg_layout();

View File

@@ -1,8 +1,8 @@
//! Tracing wrapper for Hyper HTTP server
use hyper0::HeaderMap;
use hyper0::{Body, Request, Response};
use std::future::Future;
use hyper0::{Body, HeaderMap, Request, Response};
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

View File

@@ -36,11 +36,11 @@
pub mod http;
use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use tracing::Subscriber;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
/// Set up OpenTelemetry exporter, using configuration from environment variables.
///

View File

@@ -15,7 +15,6 @@ arc-swap.workspace = true
sentry.workspace = true
async-compression.workspace = true
anyhow.workspace = true
backtrace.workspace = true
bincode.workspace = true
bytes.workspace = true
camino.workspace = true

View File

@@ -1,6 +1,6 @@
use std::time::Duration;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use pprof::criterion::{Output, PProfProfiler};
use utils::id;
use utils::logging::log_slow;

View File

@@ -1,12 +1,15 @@
// For details about authentication see docs/authentication.md
use arc_swap::ArcSwap;
use std::{borrow::Cow, fmt::Display, fs, sync::Arc};
use std::borrow::Cow;
use std::fmt::Display;
use std::fs;
use std::sync::Arc;
use anyhow::Result;
use arc_swap::ArcSwap;
use camino::Utf8Path;
use jsonwebtoken::{
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode,
};
use serde::{Deserialize, Serialize};
@@ -129,7 +132,9 @@ impl JwtAuth {
anyhow::bail!("path is neither a directory or a file")
};
if decoding_keys.is_empty() {
anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected.");
anyhow::bail!(
"Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected."
);
}
Ok(Self::new(decoding_keys))
}
@@ -175,9 +180,10 @@ pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String>
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use super::*;
// Generated with:
//
// openssl genpkey -algorithm ed25519 -out ed25519-priv.pem
@@ -215,7 +221,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJpYXQiOjE2Nzg0NDI0Nzl9.rNheBnluMJNgXzSTTJoTNIGy4P_qe0JUHl_nVEGuDCTgHOThPVr552EnmKccrCKquPeW3c2YUk0Y9Oh4KyASAw";
// Check it can be validated with the public key
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let claims_from_token = auth.decode(encoded_eddsa).unwrap().claims;
assert_eq!(claims_from_token, expected_claims);
}
@@ -230,7 +238,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519).unwrap();
// decode it back
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let decoded = auth.decode(&encoded).unwrap();
assert_eq!(decoded.claims, claims);

View File

@@ -121,10 +121,12 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::io;
use tokio::sync::Mutex;
use super::*;
#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;

View File

@@ -13,9 +13,11 @@
#![warn(missing_docs)]
use bincode::Options;
use serde::{de::DeserializeOwned, Serialize};
use std::io::{self, Read, Write};
use bincode::Options;
use serde::Serialize;
use serde::de::DeserializeOwned;
use thiserror::Error;
/// An error that occurred during a deserialize operation
@@ -261,10 +263,12 @@ impl<T> LeSer for T {}
#[cfg(test)]
mod tests {
use super::DeserializeError;
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use serde::{Deserialize, Serialize};
use super::DeserializeError;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShortStruct {
a: u8,

View File

@@ -1,7 +1,5 @@
use std::{
fmt::Display,
time::{Duration, Instant},
};
use std::fmt::Display;
use std::time::{Duration, Instant};
use metrics::IntCounter;

View File

@@ -1,4 +1,5 @@
use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
use tokio_util::task::TaskTracker;
use tokio_util::task::task_tracker::TaskTrackerToken;
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///

Some files were not shown because too many files have changed in this diff Show More