Compare commits

..

214 Commits

Author SHA1 Message Date
quantumish
44ec0e6f06 Document problems and pitfalls with fine-grained hashmap impl 2025-08-12 13:42:48 -07:00
quantumish
d33071a386 Fix concurrency bugs in resizing (WIP) 2025-08-12 10:34:29 -07:00
David Freifeld
add1a0ad78 Add initial work in implementing incremental resizing (WIP) 2025-07-14 09:02:56 -07:00
David Freifeld
282b90df28 Fix freelist bug, clean up interface to BucketIdx 2025-07-11 12:47:27 -07:00
David Freifeld
7f655ddb9a Heavily WIP rewrite of hashmap for concurrency + perf 2025-07-10 17:34:30 -07:00
David Freifeld
da6f4dbafe Merge branch 'communicator-rewrite' into quantumish/lfc-resizable-map 2025-07-10 17:33:56 -07:00
Heikki Linnakangas
a79fd3bda7 Move logic for picking request slot to the C code
With this refactoring, the Rust code deals with one giant array of
requests, and doesn't know that it's sliced up per backend
process. The C code is now responsible for slicing it up.

This also adds code to complete old IOs at backends start that were
started and left behind by a previous session. That was a little more
straightforward to do with the refactoring, which is why I tackled it
now.
2025-07-07 12:59:08 +03:00
Heikki Linnakangas
e1b58d5d69 Don't segfault if one of the unimplemented functions are called
We'll need to implement these, but let's stop the crashing for now
2025-07-07 11:33:44 +03:00
Erik Grinaker
9ae004f3bc Rename ShardMap to ShardSpec 2025-07-06 19:13:59 +02:00
Erik Grinaker
341c5f53d8 Restructure get_page retries 2025-07-06 18:35:47 +02:00
Erik Grinaker
4b06b547c1 pageserver/client_grpc: add shard map updates 2025-07-06 13:27:17 +02:00
Heikki Linnakangas
74e0d85a04 fix: Don't lose track of in-progress request if query is cancelled 2025-07-06 13:04:03 +03:00
Erik Grinaker
23ba42446b Fix accidental 1ms sleeps for GetPages 2025-07-06 11:09:58 +02:00
Heikki Linnakangas
71a83daac2 Revert crate dependencies to the versions in main branch
Some tests were failing with "Only request bodies with a known size
can be checksum validated." erros. This is a known issue with more
recent aws client versions, see
https://github.com/neondatabase/neon/issues/11363.
2025-07-05 18:03:19 +03:00
Heikki Linnakangas
1b8355a9f9 put back option lost in merge 2025-07-05 17:36:27 +03:00
Heikki Linnakangas
e14bb4be39 Merge remote-tracking branch 'origin/main' into communicator-rewrite 2025-07-05 16:59:51 +03:00
Heikki Linnakangas
f3a6c0d8ff cargo fmt 2025-07-05 16:26:24 +03:00
Heikki Linnakangas
17ec37aab2 Close gRPC getpage streams on shutdown
Some tests were failing, because pageserver didn't shut down promptly.
Tonic server graceful shutdown was a little too graceful; any open
streams linger until they're closed. Check the cancellation token
while waiting for next request, and close the stream if
shutdown/cancellation was requested.
2025-07-05 16:26:24 +03:00
Heikki Linnakangas
d6ec1f1a1c Skip legacy LFC initialization when communicator is used
It clashes with the initialization of the LFC file
2025-07-05 16:26:24 +03:00
Erik Grinaker
6f3fb4433f Add TODO 2025-07-05 14:15:34 +02:00
Erik Grinaker
d7678df445 Reap idle pool resources 2025-07-05 13:35:28 +02:00
Erik Grinaker
03d9f0ec41 Comment tweaks 2025-07-05 11:16:40 +02:00
Erik Grinaker
56845f2da2 Add GetPageClass::is_bulk 2025-07-05 11:15:28 +02:00
Heikki Linnakangas
9a37bfdf63 Fix re-finding an entry in bucket chain 2025-07-05 00:44:46 +03:00
Heikki Linnakangas
4c916552e8 Reduce logging noise
These are very useful while debugging, but also very noisy; let's dial
it down a little.
2025-07-04 23:11:36 +03:00
Heikki Linnakangas
50fbf4ac53 Fix hash table initialization across forked processes
attach_writer()/reader() are called from each forked process. It's too
late to do initialization there, in fact we used to overwrite the
contents of the hash table (or at least the freelist?) every time a
new process attached to it. The initialization must be done earlier,
in the HashMapInit() constructors.
2025-07-04 23:08:34 +03:00
Erik Grinaker
cb698a3951 Add dedicated client pools for bulk requests 2025-07-04 21:52:25 +02:00
Erik Grinaker
f6cc5cbd0c Split out retry handler to separate module 2025-07-04 20:20:09 +02:00
Heikki Linnakangas
00affada26 Add request ID to all communicator log lines as context information 2025-07-04 20:34:26 +03:00
Heikki Linnakangas
90d3c09c24 Minor cleanup
Tidy up and add some comments. Rename a few things for clarity.
2025-07-04 20:32:59 +03:00
Heikki Linnakangas
6c398aeae7 Fix dependency in Makefile 2025-07-04 20:24:21 +03:00
Heikki Linnakangas
1856bbbb9f Minor cleanup and commenting 2025-07-04 18:28:34 +03:00
Heikki Linnakangas
bd46dd60a0 Add a temporary timeout to handling an IO request in the communicator
It's nicer to timeout in the communicator and return an error to the
backend, than PANIC the backend.
2025-07-04 16:08:22 +03:00
Heikki Linnakangas
5f2d476a58 Add request ID to io-in-progress locking table, to ease debugging
I also added INFO messages for when a backend blocks on the
io-in-progress lock. It's probably too noisy for production, but
useful now to get a picture of how much it happens.
2025-07-04 15:55:57 +03:00
Heikki Linnakangas
3231cb6138 Await the io-in-progress locking futures
Otherwise they don't do anything. Oops.
2025-07-04 15:55:57 +03:00
Heikki Linnakangas
e558e0da5c Assign request_id earlier, in the originating backend
Makes it more useful for stitching together logs etc. for a specific
request.
2025-07-04 15:55:55 +03:00
Heikki Linnakangas
70bf2e088d Request multiple block numbers in a single GetPageV request
That's how it was always intended to be used
2025-07-04 15:49:04 +03:00
Heikki Linnakangas
da3f9ee72d cargo fmt 2025-07-04 12:39:41 +03:00
Erik Grinaker
88d1127bf4 Tweak GetPageSplitter 2025-07-03 21:12:26 +02:00
David Freifeld
794bb7a9e8 Merge branch 'quantumish/comm-lfc-integration' into communicator-rewrite 2025-07-03 10:52:29 -07:00
Erik Grinaker
42e4e5a418 Add GetPage request splitting 2025-07-03 18:31:12 +02:00
Heikki Linnakangas
96a817fa2b Fix the case that storage auth token is _not_ used
I broke that in previous commit while fixing the case of using a token.
2025-07-03 18:39:06 +03:00
Heikki Linnakangas
e7b057f2e8 Fix passing storage JWT token to the communicator process
Makes the 'test_compute_auth_to_pageserver' test pass
2025-07-03 18:14:22 +03:00
Heikki Linnakangas
956c2f4378 cargo fmt 2025-07-03 16:16:42 +03:00
Heikki Linnakangas
3293e4685e Fix cases where pageserver gets stuck waiting for LSN
The compute might make a request with an LSN that it hasn't even
flushed yet.
2025-07-03 16:14:45 +03:00
Erik Grinaker
6f8650782f Client tweaks 2025-07-03 14:54:23 +02:00
Erik Grinaker
14214eb853 Add client shard routing 2025-07-03 14:42:35 +02:00
Erik Grinaker
d4b4724921 Sanity-check Pageserver URLs 2025-07-03 14:18:14 +02:00
Erik Grinaker
9aba9550dd Instrument client methods 2025-07-03 14:11:53 +02:00
Erik Grinaker
375e8e5592 Improve retries and logging 2025-07-03 14:02:43 +02:00
Erik Grinaker
52c586f678 Restructure shard management 2025-07-03 11:51:19 +02:00
Erik Grinaker
de97b73d6e Lint fixes 2025-07-03 10:38:14 +02:00
Heikki Linnakangas
d8556616c9 Fix running Postgres in "vanilla mode", without neon storage
Some tests do that
2025-07-03 00:32:40 +03:00
Heikki Linnakangas
d8296e60e6 Fix caching of newly extended pages
This fixes read errors e.g. in test_compute_catalog.py test (and
probably many others).
2025-07-02 23:21:42 +03:00
Heikki Linnakangas
7263d6e2e5 Clarify error message if not_modified_lsn > request_lsn
I'm seeing this error from some python tests. Which means there's a
bug in the compute side of course, but it took me a while to figure
that out.
2025-07-02 23:21:42 +03:00
David Freifeld
b018b0ff60 Fix cargo clippy lints for neon-shmem 2025-07-02 13:01:26 -07:00
David Freifeld
86fb7b966a Update integrated_cache.rs to use new hashmap API 2025-07-02 12:18:37 -07:00
David Freifeld
0c099b0944 Merge branch 'quantumish/lfc-resizable-map' into quantumish/comm-lfc-integration 2025-07-02 12:05:24 -07:00
David Freifeld
2fe27f510d Make neon-shmem tests thread-safe and report errno in panics 2025-07-02 11:57:49 -07:00
David Freifeld
19b5618578 Switch to neon_shmem::sync lock_api and integrate into hashmap 2025-07-02 11:44:38 -07:00
Erik Grinaker
12dade35fa Comment tweaks 2025-07-02 14:47:27 +02:00
Erik Grinaker
1ec63bd6bc Misc pool improvements 2025-07-02 14:42:06 +02:00
Heikki Linnakangas
7012b4aa90 Remove --grpc options from neon_local endpoint reconfigure and start calls
They don't exist in neon_local anymore, and aren't actually used in
tests either.
2025-07-02 15:10:18 +03:00
Heikki Linnakangas
2cc28c75be Fix "ERROR: could not read size of rel ..." in many regression tests.
We were incorrectly skipping the call to communicator_new_rel_create(),
which resulted in an error during index build, when the btree build code
tried to check the size of the newly-created relation.
2025-07-02 14:10:11 +03:00
Erik Grinaker
bf01145ae4 Remove some old code 2025-07-02 11:46:54 +02:00
Erik Grinaker
8ab8fc11a3 Use new PageserverClient 2025-07-02 11:27:56 +02:00
Erik Grinaker
6f0af96a54 Add new PageserverClient 2025-07-02 10:59:40 +02:00
Heikki Linnakangas
9913d2668a print retried pageserver requests to log
Not sure how verbose we want this to be in production, but for now,
more is better.

This shows that many tests are failing with errors like these:

    PG:2025-07-01 23:02:34.311 GMT [1456523] LOG:  [COMMUNICATOR] send_process_get_rel_size_request: got error status: NotFound, message: "Read error", details: [], metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Tue, 01 Jul 2025 23:02:34 GMT"} }, retrying​

I haven't debugged why that is yet. Did the compute make a bogus request?
2025-07-02 02:04:04 +03:00
Heikki Linnakangas
2fefece77d temporary hack to make regression tests fail faster 2025-07-02 01:42:39 +03:00
Heikki Linnakangas
471191e64e Fix updating relsize cache during WAL replay
This makes some of the test_runner/regress/test_hot_standby.py tests
pass, (Others are still failing..)
2025-07-01 21:22:04 +03:00
Erik Grinaker
f6761760a2 Documentation and tweaks 2025-07-01 17:54:41 +02:00
Erik Grinaker
0bce818d5e Add stream pool 2025-07-01 17:54:41 +02:00
Erik Grinaker
48be1da6ef Add initial client pool 2025-07-01 17:54:41 +02:00
Erik Grinaker
d2efc80e40 Add initial ChannelPool 2025-07-01 17:54:41 +02:00
Erik Grinaker
958c2577f5 pageserver: tighten up page_api::Client 2025-07-01 17:54:41 +02:00
Heikki Linnakangas
175c2e11e3 Add assertions that the legacy relsize cache is not used with new communicator
And fix a few cases where it was being called
2025-07-01 16:44:25 +03:00
Heikki Linnakangas
efdb07e7b6 Implement function to check if page is in local cache
This is needed for read replicas. There's one more TODO that needs to
implemented before read replicas work though, in
neon_extend_rel_size()
2025-07-01 16:22:51 +03:00
Heikki Linnakangas
b0970b415c Don't call legacy lfc function when new communicator is used 2025-07-01 15:47:26 +03:00
David Freifeld
9d3e07ef2c Add initial prototype of shmem sync primitives 2025-06-30 17:07:07 -07:00
Heikki Linnakangas
7429dd711c fix the .metrics.socket filename in the ignore list 2025-06-30 23:41:09 +03:00
Heikki Linnakangas
88ac1e356b Ignore the metrics unix domain socket in tests 2025-06-30 23:39:01 +03:00
Erik Grinaker
c3cb1ab98d Merge branch 'main' into communicator-rewrite 2025-06-30 21:07:01 +02:00
Erik Grinaker
81ac4ef43a Add a generic pool prototype 2025-06-30 14:49:34 +02:00
Erik Grinaker
a5b0fc560c Fix/allow remaining clippy lints 2025-06-30 12:36:20 +02:00
Erik Grinaker
67b04f8ab3 Fix a bunch of linter warnings 2025-06-30 11:10:02 +02:00
Erik Grinaker
9d9e3cd08a Fix test_normal_work grpc param 2025-06-30 10:13:46 +02:00
Heikki Linnakangas
97a8f4ef85 Handle unexpected EOF while doing an LFC read more gracefully
There's a bug somewhere because this happens in python regression
tests. We need to hunt that down, but in any case, let's not get stuck
in an infinite loop if it happens.
2025-06-30 00:59:53 +03:00
Heikki Linnakangas
39f31957e3 Handle pageserver response with different number of pages gracefully
Some tests are hitting this case, where pageserver returns 0 page
images in the response to a GetPage request. I suspect it's because
the code doesn't handle sharding correclty? In any case, let's not
panic on it, but return an IO error to the originating backend.
2025-06-29 23:44:28 +03:00
Heikki Linnakangas
924c6a6fdf Fix handling the case that server closes the stream
- avoid panic by checking for Ok(None) response from
  tonic::Streaming::message() instead of just using unwrap()
- There was a race condition, if the caller sent the message, but the
  receiver task concurrently received Ok(None) indicating the stream
  was closed. (I didn't see that in action, but I think it could happen
  by reading the code)
2025-06-29 22:53:39 +03:00
Heikki Linnakangas
7020476bf5 Run cargo fmt 2025-06-29 22:53:09 +03:00
Heikki Linnakangas
80e948db93 Remove ununused mock factory
After reading the code a few times, I didn't quite understand what it
was, to be honest, or how it was going to be used. Remove it now to
reduce noise, but we can resurrect it from git history if we need it
in the future.
2025-06-29 22:52:48 +03:00
Heikki Linnakangas
bfb30d434c minor code tidy-up 2025-06-29 22:51:34 +03:00
Heikki Linnakangas
f3ba201800 Run cargo fmt 2025-06-29 21:21:07 +03:00
Heikki Linnakangas
8b7796cbfa wip 2025-06-29 21:20:48 +03:00
Heikki Linnakangas
fdc7e9c2a4 Extract repeated code to look up RequestTracker into a helper function 2025-06-29 21:20:14 +03:00
Heikki Linnakangas
a352d290eb Plumb through both libpq and grpc connection strings to the compute
Add a new 'pageserver_connection_info' field in the compute spec. It
replaces the old 'pageserver_connstring' field with a more complicated
struct that includes both libpq and grpc URLs, for each shard (or only
one of the the URLs, depending on the configuration). It also includes
a flag suggesting which one to use; compute_ctl now uses it to decide
which protocol to use for the basebackup.

This is compatible with everything that's in production, because the
control plane never used the 'pageserver_connstring' field. That was
added a long time ago with the idea that it would replace the code
that digs the 'neon.pageserver_connstring' GUC from the list of
Postgres settings, but we never got around to do that in the control
plane. Hence, it was only used with neon_local. But the plan now is to
pass the 'pageserver_connection_info' from the control plane, and once
that's fully deployed everywhere, the code to parse
'neon.pageserver_connstring' in compute_ctl can be removed.

The 'grpc' flag on an endpoint in endpoint config is now more of a
suggestion. Compute_ctl gets both URLs, so it can choose to use libpq
or grpc as it wishes. It currently always obeys the 'prefer_grpc' flag
that's part of the connection info though. Postgres however uses grpc
iff the new rust-based communicator is enabled.

TODO/plan for the control plane:

- Start to pass `pageserver_connection_info` in the spec file.
- Also keep the current `neon.pageserver_connstring` setting for now,
  for backwards compatibility with old computes

After that, the `pageserver_connection_info.prefer_grpc` flag in the
spec file can be used to control whether compute_ctl uses grpc or
libpq.  The actual compute's grpc usage will be controlled by the
`neon.enable_new_communicator` GUC. It can be set separately from
'prefer_grpc'.

Later:

- Once all old computes are gone, remove the code to pass
  `neon.pageserver_connstring`
2025-06-29 18:16:49 +03:00
Heikki Linnakangas
8c122a1c98 Don't call into the old LFC when using the new communicator
This fixes errors like `index "pg_class_relname_nsp_index" contains
unexpected zero page at block 2` when running the python tests

smgrzeroextend() still called into the old LFC's lfc_write() function,
even when using the new communicator, which zeroed some arbitrary
pages in the LFC file, overwriting pages managed by the new LFC
implementation managed by `integrated_cache.rs`
2025-06-29 17:40:46 +03:00
David Freifeld
74330920ee Simplify API, squash bugs, and expand hashmap test suite 2025-06-27 17:11:22 -07:00
David Freifeld
c3c136ef3a Remove statistics utilities from neon_shmem crate 2025-06-27 17:10:52 -07:00
David Freifeld
78b6da270b Sketchily integrate hashmap rewrite with integrated_cache 2025-06-26 16:45:48 -07:00
David Freifeld
47664e40d4 Initial work in visualizing properties of hashmap 2025-06-26 16:00:33 -07:00
David Freifeld
b1e3161d4e Satisfy cargo clippy lints, simplify shrinking API 2025-06-26 14:32:32 -07:00
David Freifeld
4713715c59 Merge branch 'communicator-rewrite' of github.com:neondatabase/neon into communicator-rewrite 2025-06-26 10:26:41 -07:00
David Freifeld
1e74b52f7e Merge branch 'quantumish/lfc-resizable-map' into communicator-rewrite 2025-06-26 10:26:22 -07:00
Erik Grinaker
e3ecdfbecc pgxn/neon: actually use UNAME_S 2025-06-26 12:38:44 +02:00
Erik Grinaker
d08e553835 pgxn/neon: fix callback_get_request_lsn_unsafe return type 2025-06-26 12:33:59 +02:00
Erik Grinaker
7fffb5b4df pgxn/neon: fix macOS build 2025-06-26 12:33:39 +02:00
David Freifeld
1fb3639170 Properly change type of HashMapInit in .with_hasher() 2025-06-25 03:03:19 -07:00
David Freifeld
00dfaa2eb4 Add Criterion microbenchmarks for rehashing and insertions 2025-06-24 16:30:59 -07:00
David Freifeld
ae740ca1bb Document hashmap implementation, fix get_bucket_for_value
Previously, `get_bucket_for_value` incorrectly divided by the size of
`V` to get the bucket index. Now it divides by the size of `Bucket<K,V>`.
2025-06-24 16:27:17 -07:00
David Freifeld
24e6c68772 Remove prev entry tracking, refactor HashMapInit into proper builder 2025-06-24 13:34:22 -07:00
David Freifeld
93a45708ff Change finish_shrink to remap entries in shrunk space 2025-06-23 16:15:43 -07:00
Heikki Linnakangas
46b5c0be0b Remove duplicated migration script
I messed this up during the merge I guess?
2025-06-23 19:46:32 +03:00
Heikki Linnakangas
2d913ff125 fix some mismerges 2025-06-23 18:21:16 +03:00
Heikki Linnakangas
e90be06d46 silence a few compiler warnings
about unnecessary 'mut's and 'use's
2025-06-23 18:16:54 +03:00
Heikki Linnakangas
356ba67607 Merge remote-tracking branch 'origin/main' into HEAD
I also included build script changes from
https://github.com/neondatabase/neon/pull/12266, which is not yet
merged but will be soon.
2025-06-23 17:46:30 +03:00
David Freifeld
610ea22c46 Generalize map to allow arbitrary hash fns, add clear() helper method 2025-06-20 11:46:02 -07:00
Heikki Linnakangas
1847f4de54 Add missing #include.
Got a warning on macos without this
2025-06-18 17:26:20 +03:00
David Freifeld
477648b8cd Clean up hashmap implementation, add bucket tests 2025-06-17 11:23:10 -07:00
Heikki Linnakangas
e8af3a2811 remove unused struct in example code, to silence compiler warning 2025-06-17 02:09:21 +03:00
Heikki Linnakangas
b603e3dddb Silence compiler warnings in example code 2025-06-17 02:07:33 +03:00
Heikki Linnakangas
83007782fd fix compilation of example 2025-06-17 02:07:15 +03:00
David Freifeld
bb1e359872 Add testing utilities for hash map, freelist bugfixes 2025-06-16 16:02:39 -07:00
David Freifeld
ac87544e79 Implement shrinking, add basic tests for core operations 2025-06-16 13:13:38 -07:00
David Freifeld
b6b122e07b nw: add shrinking and deletion skeletons 2025-06-16 10:20:30 -07:00
Erik Grinaker
782062014e Fix test_normal_work endpoint restart 2025-06-16 10:16:27 +02:00
Erik Grinaker
d0b3629412 Tweak base backups 2025-06-13 13:47:26 -07:00
Heikki Linnakangas
16d6898e44 git add missing file 2025-06-12 02:37:59 +03:00
Erik Grinaker
f4d51c0f5c Use gRPC for test_normal_work 2025-06-09 22:51:15 +02:00
Erik Grinaker
ec17ae0658 Handle gRPC basebackups in compute_ctl 2025-06-09 22:50:57 +02:00
Erik Grinaker
9ecce60ded Plumb gRPC addr through storage-controller 2025-06-09 20:24:18 +02:00
Erik Grinaker
e74a957045 test_runner: initial gRPC protocol support 2025-06-06 16:56:33 +02:00
Erik Grinaker
396a16a3b2 test_runner: enable gRPC Pageserver 2025-06-06 14:55:29 +02:00
Elizabeth Murray
7140a50225 Minor changes to get integration tests to run for communicator. 2025-06-06 04:32:51 +02:00
Elizabeth Murray
68f18ccacf Request Tracker Prototype
Does not include splitting requests across shards.
2025-06-05 13:32:18 -07:00
Heikki Linnakangas
786888d93f Instead of a fixed TCP port for metrics, listen on a unix domain socket
That avoids clashes if you run two computes at the same time. More
secure too. We might want to have a TCP port in the long run, but this
is less trouble for now.

To see the metrics with curl you can use:

    curl --unix-socket .neon/endpoints/ep-main/pgdata/.metrics.socket http://localhost/metrics
2025-06-05 21:28:11 +03:00
Heikki Linnakangas
255537dda1 avoid hitting assertion failure in MarkPostmasterChildWalSender() 2025-06-05 20:08:32 +03:00
Erik Grinaker
8b494f6a24 Ignore communicator_bindings.h 2025-06-05 17:52:50 +02:00
Erik Grinaker
28a61741b3 Mangle gRPC connstrings to use port 51051 2025-06-05 17:46:58 +02:00
Heikki Linnakangas
10b936bf03 Use a custom Rust implementation to replace the LFC hash table
The new implementation lives in a separately allocated shared memory
area, which could be resized. Resizing it isn't actually implemented
yet, though. It would require some co-operation from the LFC code.
2025-06-05 18:31:29 +03:00
Erik Grinaker
2fb6164bf8 Misc build fixes 2025-06-05 17:22:11 +02:00
Erik Grinaker
328f28dfe5 impl Default for SlabBlockHeader 2025-06-05 17:18:28 +02:00
Erik Grinaker
95838056da Fix RelTag fields 2025-06-05 17:13:51 +02:00
Heikki Linnakangas
6145cfd1c2 Move neon-shmem facility to separate module within the crate 2025-06-05 18:13:03 +03:00
Erik Grinaker
6d451654f1 Remove generated communicator_bindings.h 2025-06-05 17:12:13 +02:00
Heikki Linnakangas
96b4de1de6 Make LFC chunk size a compile-time constant
A runtime setting is nicer, but the next commit will replace the hash
table with a different implementation that requires the value size to
be a compile-time constant.
2025-06-05 18:08:40 +03:00
Heikki Linnakangas
9fdf5fbb7e Use a separate freelist to track LFC "holes"
When the LFC is shrunk, we punch holes in the underlying file to
release the disk space to the OS. We tracked it in the same hash table
as the in-use entries, because that was convenient. However, I'm
working on being able to shrink the hash table too, and once we do
that, we'll need some other place to track the holes. Implement a
simple scheme of an in-memory array and a chain of on-disk blocks for
that.
2025-06-05 18:08:35 +03:00
Erik Grinaker
37c58522a2 Merge branch 'main' into communicator-rewrite 2025-06-05 15:08:05 +02:00
Erik Grinaker
4b6f02e47d Merge branch 'main' into communicator-rewrite 2025-06-04 10:23:29 +02:00
Erik Grinaker
8202c6172f Merge branch 'main' into communicator-rewrite 2025-06-03 16:04:31 +02:00
Erik Grinaker
69a47d789d pageserver: remove gRPC compute service prototype 2025-06-03 13:47:21 +02:00
Erik Grinaker
b36f880710 Fix Linux build failures 2025-06-03 13:37:56 +02:00
Erik Grinaker
745b750f33 Merge branch 'main' into communicator-rewrite 2025-06-03 13:29:45 +02:00
Heikki Linnakangas
f06bb2bbd8 Implement growing the hash table. Fix unit tests. 2025-05-29 15:54:55 +03:00
Heikki Linnakangas
b3c25418a6 Add metrics to track memory usage of the rust communicator 2025-05-29 02:14:01 +03:00
Heikki Linnakangas
33549bad1d use separate hash tables for relsize cache and block mappings 2025-05-28 23:57:55 +03:00
Heikki Linnakangas
009168d711 Add placeholder shmem hashmap implementation
Use that instead of the half-baked Adaptive Radix Tree
implementation. ART would probably be better in the long run, but more
complicated to implement.
2025-05-28 11:08:35 +03:00
Elizabeth Murray
7c9bd542a6 Fix compile warnings, minor cleanup. 2025-05-26 06:30:48 -07:00
Elizabeth Murray
014823b305 Add a new iteration of a new client pool with some updates. 2025-05-26 05:29:32 -07:00
Elizabeth Murray
af9379ccf6 Use a sempahore to gate access to connections. Add metrics for testing. 2025-05-26 05:28:50 -07:00
Heikki Linnakangas
bb28109ffa Merge remote-tracking branch 'origin/main' into communicator-rewrite-with-integrated-cache
There were conflicts because of the differences in the page_api
protocol that was merged to main vs what was on the branch. I adapted
the code for the protocol in main.
2025-05-26 11:52:32 +03:00
Elizabeth Murray
60a0bec1c0 Set default max consumers per connection to a high number. 2025-05-19 07:00:39 -07:00
Elizabeth Murray
31fa7a545d Remove unnecessary info include now that the info message is gone. 2025-05-19 06:52:07 -07:00
Elizabeth Murray
ac464c5f2c Return info message that was used for debugging. 2025-05-19 06:39:16 -07:00
Elizabeth Murray
0dddb1e373 Add back whitespace that was removed. 2025-05-19 06:34:52 -07:00
Elizabeth Murray
3acb263e62 Add first iteration of simulating a flakey network with a custom TCP. 2025-05-19 06:33:30 -07:00
Elizabeth Murray
1e83398cdd Correct out-of-date comment. 2025-05-14 07:31:52 -07:00
Elizabeth Murray
be8ed81532 Connection pool: update error accounting, sweep idle connections, add config options. 2025-05-14 07:31:52 -07:00
Heikki Linnakangas
12b08c4b82 Fix shutdown 2025-05-14 01:49:55 +03:00
Heikki Linnakangas
827358dd03 Handle OOMs a little more gracefully 2025-05-12 23:33:22 +03:00
Heikki Linnakangas
d367273000 minor cleanup 2025-05-12 23:11:55 +03:00
Heikki Linnakangas
e2bad5d9e9 Add debugging HTTP endpoint for dumping the cache tree 2025-05-12 22:54:03 +03:00
Heikki Linnakangas
5623e4665b bunch of fixes 2025-05-12 18:40:54 +03:00
Heikki Linnakangas
8abb4dab6d implement shrinking nodes 2025-05-12 03:57:10 +03:00
Heikki Linnakangas
731667ac37 better metrics of the art tree 2025-05-12 02:08:51 +03:00
Heikki Linnakangas
6a1374d106 Pack tree node structs more tightly, avoiding alignment padding 2025-05-12 01:01:58 +03:00
Heikki Linnakangas
f7c908f2f0 more metrics 2025-05-12 01:01:50 +03:00
Heikki Linnakangas
86671e3a0b Add a bunch of metric counters 2025-05-11 20:11:13 +03:00
Heikki Linnakangas
319cd74f73 Fix eviction 2025-05-11 19:34:50 +03:00
Heikki Linnakangas
0efefbf77c Add a few metrics, fix page eviction 2025-05-10 03:13:28 +03:00
Heikki Linnakangas
e6a4171fa1 fix concurrency issues with the LFC
- Add another locking hash table to track which cached pages are currently being
  modified, by smgrwrite() or smgrread() or by prefetch.

- Use single-value Leaf pages in the art tree. That seems simpler after all,
  and it eliminates some corner cases where a Value needed to be cloned, which
  made it tricky to use atomics or other interior mutability on the Values
2025-05-10 02:36:48 +03:00
Heikki Linnakangas
0c25ea9e31 reduce LOG noise 2025-05-09 18:27:36 +03:00
Heikki Linnakangas
6692321026 Remove dependency on io_uring, use plain std::fs ops instead
io_uring is a great idea in the long term, but for now, let's make it
easier to develop locally on macos, where io_uring is not available.
2025-05-06 17:46:21 +03:00
Heikki Linnakangas
791df28755 Linked list fix and add unit test 2025-05-06 16:46:54 +03:00
Heikki Linnakangas
d20da994f4 git add missing file 2025-05-06 15:36:48 +03:00
Heikki Linnakangas
6dbbdaae73 run 'cargo fmt' 2025-05-06 15:35:56 +03:00
Heikki Linnakangas
977bc09d2a Bunch of fixes, smarter iterator, metrics exporter 2025-05-06 15:28:50 +03:00
Heikki Linnakangas
44269fcd5e Implement simple eviction and free block tracking 2025-05-06 15:28:15 +03:00
Heikki Linnakangas
44cc648dc8 Implement iterator over keys
the implementation is not very optimized, but probably good enough for an MVP
2025-05-06 15:27:38 +03:00
Heikki Linnakangas
884e028a4a implement deletion in art tree 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
42df3e5453 debugging stats 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
fc743e284f more work on allocators 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
d02f9a2139 Collect garbage, handle OOMs 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
083118e98e Implement epoch system 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
54cd2272f1 more memory allocation stuff 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
e40193e3c8 simple block-based allocator 2025-05-06 15:27:38 +03:00
Heikki Linnakangas
ce9f7bacc1 Fix communicator client for recent changes in protocol and client code 2025-05-06 15:26:51 +03:00
Heikki Linnakangas
b7891f8fe8 Include 'neon-shard-id' header in client requests 2025-05-06 15:23:30 +03:00
Elizabeth Murray
5f2adaa9ad Remove some additional debug info messages. 2025-05-02 10:50:53 -07:00
Elizabeth Murray
3e5e396c8d Remove some debug info messages. 2025-05-02 10:24:18 -07:00
Elizabeth Murray
9d781c6fda Add a connection pool module to the grpc client. 2025-05-02 10:22:33 -07:00
Erik Grinaker
cf5d038472 service documentation 2025-05-02 15:20:12 +02:00
Erik Grinaker
d785100c02 page_api: add GetPageRequest::class 2025-05-02 10:48:32 +02:00
Erik Grinaker
2c0d930e3d page_api: add GetPageResponse::status 2025-04-30 16:48:45 +02:00
Erik Grinaker
66171a117b page_api: add GetPageRequestBatch 2025-04-30 15:31:11 +02:00
Erik Grinaker
df2806e7a0 page_api: add GetPageRequest::id 2025-04-30 15:00:16 +02:00
Erik Grinaker
07631692db page_api: protobuf comments 2025-04-30 12:36:11 +02:00
Erik Grinaker
4c77397943 Add neon-shard-id header 2025-04-30 11:18:06 +02:00
Erik Grinaker
7bb58be546 Use authorization header instead of neon-auth-token 2025-04-30 10:38:44 +02:00
Erik Grinaker
b5373de208 page_api: add get_slru_segment() 2025-04-29 17:59:27 +02:00
Erik Grinaker
b86c610f42 page_api: tweaks 2025-04-29 17:23:51 +02:00
Erik Grinaker
0f520d79ab pageserver: rename data_api to page_api 2025-04-29 15:58:52 +02:00
Heikki Linnakangas
93eb7bb6b8 include lots of changes that went missing by accident 2025-04-29 15:32:27 +03:00
Heikki Linnakangas
e58d0fece1 New communicator, with "integrated" cache accessible from all processes 2025-04-29 11:52:44 +03:00
465 changed files with 17759 additions and 24165 deletions

View File

@@ -21,20 +21,18 @@ platforms = [
# "x86_64-apple-darwin",
# "x86_64-pc-windows-msvc",
]
[final-excludes]
workspace-members = [
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
# from depending on workspace-hack because most of the dependencies are not used.
"vm_monitor",
# subzero-core is a stub crate that should be excluded from workspace-hack
"subzero-core",
# All of these exist in libs and are not usually built independently.
# Putting workspace hack there adds a bottleneck for cargo builds.
"compute_api",
"consumption_metrics",
"desim",
"json",
"metrics",
"pageserver_api",
"postgres_backend",

View File

@@ -27,4 +27,4 @@
!storage_controller/
!vendor/postgres-*/
!workspace_hack/
!build-tools/patches
!build_tools/patches

View File

@@ -7,7 +7,6 @@ self-hosted-runner:
- small-metal
- small-arm64
- unit-perf
- unit-perf-aws-arm
- us-east-2
config-variables:
- AWS_ECR_REGION
@@ -31,7 +30,6 @@ config-variables:
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- PREWARM_PGBENCH_SIZE
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID

View File

@@ -1,28 +0,0 @@
name: 'Prepare current job for subzero'
description: >
Set git token to access `neondatabase/subzero` from cargo build,
and set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable to use git CLI
inputs:
token:
description: 'GitHub token with access to neondatabase/subzero'
required: true
runs:
using: "composite"
steps:
- name: Set git token for neondatabase/subzero
uses: pyTooling/Actions/with-post-step@2307b526df64d55e95884e072e49aac2a00a9afa # v5.1.0
env:
SUBZERO_ACCESS_TOKEN: ${{ inputs.token }}
with:
main: |
git config --global url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a
post: |
git config --global --unset url."https://x-access-token:${SUBZERO_ACCESS_TOKEN}@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"
- name: Set `CARGO_NET_GIT_FETCH_WITH_CLI=true` env variable
shell: bash -euxo pipefail {0}
run: echo "CARGO_NET_GIT_FETCH_WITH_CLI=true" >> ${GITHUB_ENV}

View File

@@ -176,13 +176,7 @@ runs:
fi
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
# We don't use code coverage for regression tests (the step is disabled),
# so there's no need to collect it.
# Ref https://github.com/neondatabase/neon/issues/4540
# cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=()
# Explicitly set LLVM_PROFILE_FILE to /dev/null to avoid writing *.profraw files
export LLVM_PROFILE_FILE=/dev/null
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
else
cov_prefix=()
fi

View File

@@ -86,10 +86,6 @@ jobs:
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
@@ -120,7 +116,7 @@ jobs:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FLAGS="--locked --features testing,rest_broker"
CARGO_FLAGS="--locked --features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_PROFILE=""
@@ -154,7 +150,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v15 build
id: cache_pg_15
@@ -166,7 +162,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v16 build
id: cache_pg_16
@@ -178,7 +174,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Cache postgres v17 build
id: cache_pg_17
@@ -190,7 +186,7 @@ jobs:
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
use-fallback: false
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools/Dockerfile') }}
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Build all
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables

View File

@@ -46,10 +46,6 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Cache cargo deps
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0

View File

@@ -219,7 +219,6 @@ jobs:
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
--ignore test_runner/performance/test_perf_many_relations.py
--ignore test_runner/performance/test_perf_oltp_large_tenant.py
--ignore test_runner/performance/test_lfc_prewarm.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -411,77 +410,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
prewarm-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run prewarm benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_lfc_prewarm.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)

View File

@@ -72,7 +72,7 @@ jobs:
ARCHS: ${{ inputs.archs || '["x64","arm64"]' }}
DEBIANS: ${{ inputs.debians || '["bullseye","bookworm"]' }}
IMAGE_TAG: |
${{ hashFiles('build-tools/Dockerfile',
${{ hashFiles('build-tools.Dockerfile',
'.github/workflows/build-build-tools-image.yml') }}
run: |
echo "archs=${ARCHS}" | tee -a ${GITHUB_OUTPUT}
@@ -144,7 +144,7 @@ jobs:
- uses: docker/build-push-action@471d1dc4e07e5cdedd4c2171150001c434f0b7a4 # v6.15.0
with:
file: build-tools/Dockerfile
file: build-tools.Dockerfile
context: .
provenance: false
push: true

View File

@@ -54,10 +54,6 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- uses: ./.github/actions/prepare-for-subzero
with:
token: ${{ secrets.CI_ACCESS_TOKEN }}
- name: Install build dependencies
run: |

View File

@@ -87,29 +87,6 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
lint-yamls:
needs: [ meta, check-permissions, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- run: make -C compute manifest-schema-validation
- run: make lint-openapi-spec
check-codestyle-python:
needs: [ meta, check-permissions, build-build-tools-image ]
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
@@ -222,6 +199,28 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
validate-compute-manifest:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '24'
- name: Validate manifest against schema
run: |
make -C compute manifest-schema-validation
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
@@ -307,14 +306,14 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, unit-perf-aws-arm ]
runs-on: [ self-hosted, unit-perf ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# for changed limits, see comments on `options:` earlier in this file
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864 --ulimit nofile=65536:65536 --security-opt seccomp=unconfined
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
strategy:
fail-fast: false
matrix:
@@ -632,8 +631,6 @@ jobs:
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-bookworm
DEBIAN_VERSION=bookworm
secrets: |
SUBZERO_ACCESS_TOKEN=${{ secrets.CI_ACCESS_TOKEN }}
provenance: false
push: true
pull: true
@@ -989,7 +986,6 @@ jobs:
- name: Verify docker-compose example and test extensions
timeout-minutes: 60
env:
PARALLEL_COMPUTES: 3
TAG: >-
${{
needs.meta.outputs.run-kind == 'compute-rc-pr'

View File

@@ -72,7 +72,6 @@ jobs:
check-macos-build:
needs: [ check-permissions, files-changed ]
uses: ./.github/workflows/build-macos.yml
secrets: inherit
with:
pg_versions: ${{ needs.files-changed.outputs.postgres_changes }}
rebuild_rust_code: ${{ fromJSON(needs.files-changed.outputs.rebuild_rust_code) }}

View File

@@ -1,4 +1,4 @@
name: Periodic pagebench performance test on unit-perf-aws-arm runners
name: Periodic pagebench performance test on unit-perf hetzner runner
on:
schedule:
@@ -40,7 +40,7 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, unit-perf-aws-arm ]
runs-on: [ self-hosted, unit-perf ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:

View File

@@ -1,4 +1,4 @@
name: Periodic proxy performance test on unit-perf-aws-arm runners
name: Periodic proxy performance test on unit-perf hetzner runner
on:
push: # TODO: remove after testing
@@ -32,7 +32,7 @@ jobs:
statuses: write
contents: write
pull-requests: write
runs-on: [self-hosted, unit-perf-aws-arm]
runs-on: [self-hosted, unit-perf]
timeout-minutes: 60 # 1h timeout
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm

10
.gitignore vendored
View File

@@ -15,7 +15,7 @@ neon.iml
/.neon
/integration_tests/.neon
compaction-suite-results.*
docker-compose/docker-compose-parallel.yml
pgxn/neon/communicator/communicator_bindings.h
# Coverage
*.profraw
@@ -26,14 +26,6 @@ docker-compose/docker-compose-parallel.yml
*.o
*.so
*.Po
*.pid
# pgindent typedef lists
*.list
# Node
**/node_modules/
# various files for local testing
/proxy/.subzero
local_proxy.json

8
.gitmodules vendored
View File

@@ -1,16 +1,16 @@
[submodule "vendor/postgres-v14"]
path = vendor/postgres-v14
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_14_STABLE_neon
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_16_STABLE_neon
[submodule "vendor/postgres-v17"]
path = vendor/postgres-v17
url = ../postgres.git
url = https://github.com/neondatabase/postgres.git
branch = REL_17_STABLE_neon

726
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -35,6 +35,7 @@ members = [
"libs/pq_proto",
"libs/tenant_size_model",
"libs/metrics",
"libs/neonart",
"libs/postgres_connection",
"libs/remote_storage",
"libs/tracing-utils",
@@ -43,13 +44,11 @@ members = [
"libs/walproposer",
"libs/wal_decoder",
"libs/postgres_initdb",
"libs/proxy/json",
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"pgxn/neon/communicator",
"proxy/subzero_core",
]
[workspace.package]
@@ -93,6 +92,7 @@ clap = { version = "4.0", features = ["derive", "env"] }
clashmap = { version = "1.0", features = ["raw-api"] }
comfy-table = "7.1"
const_format = "0.2"
crossbeam-utils = "0.8.21"
crc32c = "0.6"
diatomic-waker = { version = "0.2.3" }
either = "1.8"
@@ -131,7 +131,6 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
jsonwebtoken = "9"
lasso = "0.7"
libc = "0.2"
lock_api = "0.4.13"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
@@ -143,33 +142,32 @@ notify = "6.0.0"
num_cpus = "1.15"
num-traits = "0.2.19"
once_cell = "1.13"
opentelemetry = "0.30"
opentelemetry_sdk = "0.30"
opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-blocking-client"] }
opentelemetry-semantic-conventions = "0.30"
opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.27"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pem = "3.0.3"
peekable = "0.3.0"
pin-project-lite = "0.2"
pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointer", "prost-codec"] }
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13.5"
prost-types = "0.13.5"
rand = "0.9"
# Remove after p256 is updated to 0.14.
rand_core = "=0.6"
rand = "0.8"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_30"] }
reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] }
reqwest-middleware = "0.4"
reqwest-retry = "0.7"
routerify = "3"
rpds = "0.13"
rustc-hash = "2.1.1"
rustc-hash = "1.1.0"
rustls = { version = "0.23.16", default-features = false }
rustls-pemfile = "2"
rustls-pki-types = "1.11"
@@ -190,6 +188,7 @@ smallvec = "1.11"
smol_str = { version = "0.2.0", features = ["serde"] }
socket2 = "0.5"
spki = "0.7.3"
spin = "0.9.8"
strum = "0.26"
strum_macros = "0.26"
"subtle" = "2.5.0"
@@ -201,11 +200,10 @@ thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] }
toml = "0.8"
@@ -214,15 +212,17 @@ tonic = { version = "0.13.1", default-features = false, features = ["channel", "
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
tower-otel = { version = "0.6", features = ["axum"] }
# This revision uses opentelemetry 0.27. There's no tag for it.
tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" }
tower-service = "0.3.3"
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.31"
tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
tracing-appender = "0.2.3"
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
twox-hash = { version = "1.6.3", default-features = false }
@@ -241,6 +241,9 @@ x509-cert = { version = "0.2.5" }
env_logger = "0.11"
log = "0.4"
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
uring-common = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }

View File

@@ -63,14 +63,7 @@ WORKDIR /home/nonroot
COPY --chown=nonroot . .
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero" && \
cargo add -p proxy subzero-core --git https://github.com/neondatabase/subzero --rev 396264617e78e8be428682f87469bb25429af88a; \
fi \
&& cargo chef prepare --recipe-path recipe.json
RUN cargo chef prepare --recipe-path recipe.json
# Main build image
FROM $REPOSITORY/$IMAGE:$TAG AS build
@@ -78,33 +71,20 @@ WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG ADDITIONAL_RUSTFLAGS=""
ENV CARGO_FEATURES="default"
# 3. Build cargo dependencies. Note that this step doesn't depend on anything else than
# `recipe.json`, so the layer can be reused as long as none of the dependencies change.
COPY --from=plan /home/nonroot/recipe.json recipe.json
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_NET_GIT_FETCH_WITH_CLI=true && \
git config --global url."https://$(cat /run/secrets/SUBZERO_ACCESS_TOKEN)@github.com/neondatabase/subzero".insteadOf "https://github.com/neondatabase/subzero"; \
fi \
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo chef cook --locked --release --recipe-path recipe.json
# Perform the main build. We reuse the Postgres build artifacts from the intermediate 'pg-build'
# layer, and the cargo dependencies built in the previous step.
COPY --chown=nonroot --from=pg-build /home/nonroot/pg_install/ pg_install
COPY --chown=nonroot . .
COPY --chown=nonroot --from=plan /home/nonroot/proxy/Cargo.toml proxy/Cargo.toml
COPY --chown=nonroot --from=plan /home/nonroot/Cargo.lock Cargo.lock
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_FEATURES="rest_broker"; \
fi \
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo build \
--features $CARGO_FEATURES \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -2,7 +2,7 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# Where to install Postgres, default is ./pg_install, maybe useful for package
# managers.
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
# Supported PostgreSQL versions
POSTGRES_VERSIONS = v17 v16 v15 v14
@@ -14,7 +14,7 @@ POSTGRES_VERSIONS = v17 v16 v15 v14
# it is derived from BUILD_TYPE.
# All intermediate build artifacts are stored here.
BUILD_DIR := $(ROOT_PROJECT_DIR)/build
BUILD_DIR := build
ICU_PREFIX_DIR := /usr/local/icu
@@ -212,7 +212,7 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/find_typedef \
INDENT=$(BUILD_DIR)/v17/src/tools/pg_bsd_indent/pg_bsd_indent \
PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/pgindent/pgindent \
-C $(BUILD_DIR)/pgxn-v17/neon \
-C $(BUILD_DIR)/neon-v17 \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent
@@ -220,19 +220,6 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
build-tools/node_modules: build-tools/package.json
cd build-tools && $(if $(CI),npm ci,npm install)
touch build-tools/node_modules
.PHONY: lint-openapi-spec
lint-openapi-spec: build-tools/node_modules
# operation-2xx-response: pageserver timeline delete returns 404 on success
find . -iname "openapi_spec.y*ml" -exec\
npx --prefix=build-tools/ redocly\
--skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\
--skip-rule=no-server-example.com --skip-rule=operation-2xx-response\
lint {} \+
# Targets for building PostgreSQL are defined in postgres.mk.
#
# But if the caller has indicated that PostgreSQL is already

View File

@@ -35,7 +35,7 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused=on\ntimeout=15\ntries=5\nretry-on-host-error=on\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
COPY build-tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
@@ -188,12 +188,6 @@ RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install node
ENV NODE_VERSION=24
RUN curl -fsSL https://deb.nodesource.com/setup_${NODE_VERSION}.x | bash - \
&& apt install -y nodejs \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \
@@ -317,14 +311,14 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools rustfmt clippy && \
cargo install rustfilt --locked --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --locked --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --locked --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --locked --version ${CARGO_NEXTEST_VERSION} && \
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
cargo install diesel_cli --locked --version ${CARGO_DIESEL_CLI_VERSION} \
--features postgres-bundled --no-default-features && \
cargo install rustfilt --version ${RUSTFILT_VERSION} --locked && \
cargo install cargo-hakari --version ${CARGO_HAKARI_VERSION} --locked && \
cargo install cargo-deny --version ${CARGO_DENY_VERSION} --locked && \
cargo install cargo-hack --version ${CARGO_HACK_VERSION} --locked && \
cargo install cargo-nextest --version ${CARGO_NEXTEST_VERSION} --locked && \
cargo install cargo-chef --version ${CARGO_CHEF_VERSION} --locked && \
cargo install diesel_cli --version ${CARGO_DIESEL_CLI_VERSION} --locked \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +0,0 @@
{
"name": "build-tools",
"private": true,
"devDependencies": {
"@redocly/cli": "1.34.4",
"@sourcemeta/jsonschema": "10.0.0"
}
}

View File

@@ -1,12 +1,9 @@
disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# tokio-epoll-uring:
# - allow-invalid because the method doesn't exist on macOS
{ path = "tokio_epoll_uring::thread_local_system", replacement = "tokio_epoll_uring_ext module inside pageserver crate", allow-invalid = true }
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]
disallowed-macros = [

View File

@@ -50,9 +50,9 @@ jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation
manifest-schema-validation: ../build-tools/node_modules
npx --prefix=../build-tools/ jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
manifest-schema-validation: node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
../build-tools/node_modules: ../build-tools/package.json
cd ../build-tools && $(if $(CI),npm ci,npm install)
touch ../build-tools/node_modules
node_modules: package.json
npm install
touch node_modules

View File

@@ -9,7 +9,7 @@
#
# build-tools: This contains Rust compiler toolchain and other tools needed at compile
# time. This is also used for the storage builds. This image is defined in
# build-tools/Dockerfile.
# build-tools.Dockerfile.
#
# build-deps: Contains C compiler, other build tools, and compile-time dependencies
# needed to compile PostgreSQL and most extensions. (Some extensions need
@@ -115,7 +115,7 @@ ARG EXTENSIONS=all
FROM $BASE_IMAGE_SHA AS build-deps
ARG DEBIAN_VERSION
# Keep in sync with build-tools/Dockerfile
# Keep in sync with build-tools.Dockerfile
ENV PROTOC_VERSION=25.1
# Use strict mode for bash to catch errors early
@@ -133,7 +133,7 @@ RUN case $DEBIAN_VERSION in \
# Install newer version (3.25) from backports.
# libstdc++-10-dev is required for plv8
bullseye) \
echo "deb http://archive.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/bullseye-backports.list; \
VERSION_INSTALLS="cmake/bullseye-backports cmake-data/bullseye-backports libstdc++-10-dev"; \
;; \
# Version-specific installs for Bookworm (PG17):
@@ -170,29 +170,7 @@ RUN case $DEBIAN_VERSION in \
FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION:?} postgres
COPY compute/patches/postgres_fdw.patch .
COPY compute/patches/pg_stat_statements_pg14-16.patch .
COPY compute/patches/pg_stat_statements_pg17.patch .
RUN cd postgres && \
# Apply patches to some contrib extensions
# For example, we need to grant EXECUTE on pg_stat_statements_reset() to {privileged_role_name}.
# In vanilla Postgres this function is limited to Postgres role superuser.
# In Neon we have {privileged_role_name} role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the Postgres repository but it would be hard to maintain,
# whenever we need to pick up a new Postgres version and we want to limit the changes in our Postgres fork,
# so we do it here.
case "${PG_VERSION}" in \
"v14" | "v15" | "v16") \
patch -p1 < /pg_stat_statements_pg14-16.patch; \
;; \
"v17") \
patch -p1 < /pg_stat_statements_pg17.patch; \
;; \
*) \
# To do not forget to migrate patches to the next major version
echo "No contrib patches for this PostgreSQL version" && exit 1;; \
esac && \
patch -p1 < /postgres_fdw.patch && \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3 -fsigned-char' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION:?}" != "v14" ]; then \
@@ -206,6 +184,8 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/dblink.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/postgres_fdw.control && \
file=/usr/local/pgsql/share/extension/postgres_fdw--1.0.sql && [ -e $file ] && \
echo 'GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw TO neon_superuser;' >> $file && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
@@ -215,7 +195,34 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control && \
# We need to grant EXECUTE on pg_stat_statements_reset() to neon_superuser.
# In vanilla postgres this function is limited to Postgres role superuser.
# In neon we have neon_superuser role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the postgres repository but it would be hard to maintain,
# whenever we need to pick up a new postgres version and we want to limit the changes in our postgres fork,
# so we do it here.
for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
# Note that there are no downgrade scripts for pg_stat_statements, so we \
# don't have to modify any downgrade paths or (much) older versions: we only \
# have to make sure every creation of the pg_stat_statements_reset function \
# also adds execute permissions to the neon_superuser.
case $filename in \
pg_stat_statements--1.4.sql) \
# pg_stat_statements_reset is first created with 1.4
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.6--1.7.sql) \
# Then with the 1.6-1.7 migration it is re-created with a new signature, thus add the permissions back
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO neon_superuser;' >> $file; \
;; \
pg_stat_statements--1.10--1.11.sql) \
# Then with the 1.10-1.11 migration it is re-created with a new signature again, thus add the permissions back
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) TO neon_superuser;' >> $file; \
;; \
esac; \
done;
# Set PATH for all the subsequent build steps
ENV PATH="/usr/local/pgsql/bin:$PATH"
@@ -1517,7 +1524,7 @@ WORKDIR /ext-src
COPY compute/patches/pg_duckdb_v031.patch .
COPY compute/patches/duckdb_v120.patch .
# pg_duckdb build requires source dir to be a git repo to get submodules
# allow {privileged_role_name} to execute some functions that in pg_duckdb are available to superuser only:
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
# - extension management function duckdb.install_extension()
# - access to duckdb.extensions table and its sequence
RUN git clone --depth 1 --branch v0.3.1 https://github.com/duckdb/pg_duckdb.git pg_duckdb-src && \
@@ -1783,7 +1790,7 @@ RUN set -e \
#########################################################################################
FROM build-deps AS exporters
ARG TARGETARCH
# Keep sql_exporter version same as in build-tools/Dockerfile and
# Keep sql_exporter version same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py
# See comment on the top of the file regading `echo`, `-e` and `\n`
RUN if [ "$TARGETARCH" = "amd64" ]; then\
@@ -1908,10 +1915,10 @@ RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /e
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN echo /usr/local/pgsql/lib > /etc/ld.so.conf.d/00-neon.conf && /sbin/ldconfig
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq parallel \
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq \
&& apt clean && rm -rf /ext-src/*.tar.gz /ext-src/*.patch /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute1
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres

7
compute/package.json Normal file
View File

@@ -0,0 +1,7 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -1,26 +1,22 @@
diff --git a/sql/anon.sql b/sql/anon.sql
index 0cdc769..5eab1d6 100644
index 0cdc769..b450327 100644
--- a/sql/anon.sql
+++ b/sql/anon.sql
@@ -1141,3 +1141,19 @@ $$
@@ -1141,3 +1141,15 @@ $$
-- TODO : https://en.wikipedia.org/wiki/L-diversity
-- TODO : https://en.wikipedia.org/wiki/T-closeness
+
+-- NEON Patches
+
+GRANT ALL ON SCHEMA anon to neon_superuser;
+GRANT ALL ON ALL TABLES IN SCHEMA anon TO neon_superuser;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT ALL ON SCHEMA anon to %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON ALL TABLES IN SCHEMA anon TO %I', privileged_role_name);
+
+ IF current_setting('server_version_num')::int >= 150000 THEN
+ EXECUTE format('GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO %I', privileged_role_name);
+ END IF;
+ IF current_setting('server_version_num')::int >= 150000 THEN
+ GRANT SET ON PARAMETER anon.transparent_dynamic_masking TO neon_superuser;
+ END IF;
+END $$;
diff --git a/sql/init.sql b/sql/init.sql
index 7da6553..9b6164b 100644

View File

@@ -21,21 +21,13 @@ index 3235cc8..6b892bc 100644
include Makefile.global
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
index d777d76..3b54396 100644
index d777d76..af60106 100644
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql
+++ b/sql/pg_duckdb--0.2.0--0.3.0.sql
@@ -1056,3 +1056,14 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
@@ -1056,3 +1056,6 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_info() TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_delete(TEXT) TO PUBLIC;
GRANT ALL ON PROCEDURE duckdb.recycle_ddb() TO PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON TABLE duckdb.extensions TO %I', privileged_role_name);
+ EXECUTE format('GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO %I', privileged_role_name);
+END $$;
+GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO neon_superuser;
+GRANT ALL ON TABLE duckdb.extensions TO neon_superuser;
+GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO neon_superuser;

View File

@@ -1,34 +0,0 @@
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
index 58cdf600fce..8be57a996f6 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
@@ -46,3 +46,12 @@ GRANT SELECT ON pg_stat_statements TO PUBLIC;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset() FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO %I', privileged_role_name);
+END $$;
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
index 6fc3fed4c93..256345a8f79 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
@@ -20,3 +20,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO %I', privileged_role_name);
+END $$;

View File

@@ -1,52 +0,0 @@
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql b/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
index 0bb2c397711..32764db1d8b 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.10--1.11.sql
@@ -80,3 +80,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) TO %I', privileged_role_name);
+END $$;
\ No newline at end of file
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
index 58cdf600fce..8be57a996f6 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.4.sql
@@ -46,3 +46,12 @@ GRANT SELECT ON pg_stat_statements TO PUBLIC;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset() FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO %I', privileged_role_name);
+END $$;
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
index 6fc3fed4c93..256345a8f79 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.6--1.7.sql
@@ -20,3 +20,12 @@ LANGUAGE C STRICT PARALLEL SAFE;
-- Don't want this to be available to non-superusers.
REVOKE ALL ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) FROM PUBLIC;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO %I', privileged_role_name);
+END $$;

View File

@@ -1,17 +0,0 @@
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
index a0f0fc1bf45..ee077f2eea6 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
@@ -16,3 +16,12 @@ LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER postgres_fdw
HANDLER postgres_fdw_handler
VALIDATOR postgres_fdw_validator;
+
+DO $$
+DECLARE
+ privileged_role_name text;
+BEGIN
+ privileged_role_name := current_setting('neon.privileged_role_name');
+
+ EXECUTE format('GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw TO %I', privileged_role_name);
+END $$;

View File

@@ -27,10 +27,7 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
http-body-util.workspace = true
hostname-validator = "1.1"
hyper.workspace = true
hyper-util.workspace = true
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
@@ -47,7 +44,6 @@ postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
ring = "0.17"
scopeguard.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
@@ -62,7 +58,6 @@ tokio-stream.workspace = true
tonic.workspace = true
tower-otel.workspace = true
tracing.workspace = true
tracing-appender.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
@@ -71,7 +66,7 @@ url.workspace = true
uuid.workspace = true
walkdir.workspace = true
x509-cert.workspace = true
postgres-types.workspace = true
postgres_versioninfo.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -46,14 +46,11 @@ stateDiagram-v2
Configuration --> Failed : Failed to configure the compute
Configuration --> Running : Compute has been configured
Empty --> Init : Compute spec is immediately available
Empty --> TerminationPendingFast : Requested termination
Empty --> TerminationPendingImmediate : Requested termination
Empty --> TerminationPending : Requested termination
Init --> Failed : Failed to start Postgres
Init --> Running : Started Postgres
Running --> TerminationPendingFast : Requested termination
Running --> TerminationPendingImmediate : Requested termination
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Running --> TerminationPending : Requested termination
TerminationPending --> Terminated : Terminated compute
Failed --> [*] : Compute exited
Terminated --> [*] : Compute exited
```

View File

@@ -51,7 +51,6 @@ use compute_tools::compute::{
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::logger::*;
use compute_tools::params::*;
use compute_tools::pg_isready::get_pg_isready_bin;
use compute_tools::spec::*;
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
@@ -88,14 +87,6 @@ struct Cli {
#[arg(short = 'C', long, value_name = "DATABASE_URL")]
pub connstr: String,
#[arg(
long,
default_value = "neon_superuser",
value_name = "PRIVILEGED_ROLE_NAME",
value_parser = Self::parse_privileged_role_name
)]
pub privileged_role_name: String,
#[cfg(target_os = "linux")]
#[arg(long, default_value = "neon-postgres")]
pub cgroup: String,
@@ -139,12 +130,6 @@ struct Cli {
/// Run in development mode, skipping VM-specific operations like process termination
#[arg(long, action = clap::ArgAction::SetTrue)]
pub dev: bool,
#[arg(long)]
pub pg_init_timeout: Option<u64>,
#[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
pub lakebase_mode: bool,
}
impl Cli {
@@ -164,21 +149,6 @@ impl Cli {
Ok(url)
}
/// For simplicity, we do not escape `privileged_role_name` anywhere in the code.
/// Since it's a system role, which we fully control, that's fine. Still, let's
/// validate it to avoid any surprises.
fn parse_privileged_role_name(value: &str) -> Result<String> {
use regex::Regex;
let pattern = Regex::new(r"^[a-z_]+$").unwrap();
if !pattern.is_match(value) {
bail!("--privileged-role-name can only contain lowercase letters and underscores")
}
Ok(value.to_string())
}
}
fn main() -> Result<()> {
@@ -195,12 +165,7 @@ fn main() -> Result<()> {
.build()?;
let _rt_guard = runtime.enter();
let mut log_dir = None;
if cli.lakebase_mode {
log_dir = std::env::var("COMPUTE_CTL_LOG_DIRECTORY").ok();
}
let (tracing_provider, _file_logs_guard) = init(cli.dev, log_dir)?;
runtime.block_on(init(cli.dev))?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
@@ -213,7 +178,6 @@ fn main() -> Result<()> {
ComputeNodeParams {
compute_id: cli.compute_id,
connstr,
privileged_role_name: cli.privileged_role_name.clone(),
pgdata: cli.pgdata.clone(),
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
@@ -231,10 +195,6 @@ fn main() -> Result<()> {
installed_extensions_collection_interval: Arc::new(AtomicU64::new(
cli.installed_extensions_collection_interval,
)),
pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
instance_id: std::env::var("INSTANCE_ID").ok(),
lakebase_mode: cli.lakebase_mode,
},
config,
)?;
@@ -243,17 +203,11 @@ fn main() -> Result<()> {
scenario.teardown();
deinit_and_exit(tracing_provider, exit_code);
deinit_and_exit(exit_code);
}
fn init(
dev_mode: bool,
log_dir: Option<String>,
) -> Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
let (provider, file_logs_guard) = init_tracing_and_logging(DEFAULT_LOG_LEVEL, &log_dir)?;
async fn init(dev_mode: bool) -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
@@ -264,7 +218,7 @@ fn init(
info!("compute build_tag: {}", &BUILD_TAG.to_string());
Ok((provider, file_logs_guard))
Ok(())
}
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
@@ -289,27 +243,25 @@ fn get_config(cli: &Cli) -> Result<ComputeConfig> {
}
}
fn deinit_and_exit(tracing_provider: Option<tracing_utils::Provider>, exit_code: Option<i32>) -> ! {
if let Some(p) = tracing_provider {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
_ = p.shutdown();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
tracing_utils::shutdown_tracing();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
info!("shutting down");
@@ -375,49 +327,4 @@ mod test {
])
.expect_err("URL parameters are not allowed");
}
#[test]
fn verify_privileged_role_name() {
// Valid name
let cli = Cli::parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"my_superuser",
]);
assert_eq!(cli.privileged_role_name, "my_superuser");
// Invalid names
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"NeonSuperuser",
])
.expect_err("uppercase letters are not allowed");
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"$'neon_superuser",
])
.expect_err("special characters are not allowed");
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--privileged-role-name",
"",
])
.expect_err("empty name is not allowed");
}
}

View File

@@ -1,98 +0,0 @@
//! Client for making request to a running Postgres server's communicator control socket.
//!
//! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
//! a Unix Domain Socket in the Postgres data directory. This provides access to it.
use std::path::Path;
use anyhow::Context;
use hyper::client::conn::http1::SendRequest;
use hyper_util::rt::TokioIo;
/// Name of the socket within the Postgres data directory. This better match that in
/// `pgxn/neon/communicator/src/lib.rs`.
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
/// Open a connection to the communicator's control socket, prepare to send requests to it
/// with hyper.
pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
where
B: hyper::body::Body + 'static + Send,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
let socket_path_len = socket_path.display().to_string().len();
// There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
// Unix Domain socket. The limit is on the connect(2) function used to open the
// socket, not on the absolute path itself. Postgres changes the current directory to
// the data directory and uses a relative path to bind to the socket, and the relative
// path "./neon-communicator.socket" is always short, but when compute_ctl needs to
// open the socket, we need to use a full path, which can be arbitrarily long.
//
// There are a few ways we could work around this:
//
// 1. Change the current directory to the Postgres data directory and use a relative
// path in the connect(2) call. That's problematic because the current directory
// applies to the whole process. We could change the current directory early in
// compute_ctl startup, and that might be a good idea anyway for other reasons too:
// it would be more robust if the data directory is moved around or unlinked for
// some reason, and you would be less likely to accidentally litter other parts of
// the filesystem with e.g. temporary files. However, that's a pretty invasive
// change.
//
// 2. On Linux, you could open() the data directory, and refer to the the socket
// inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
// Linux-only.
//
// 3. Create a symbolic link to the socket with a shorter path, and use that.
//
// We use the symbolic link approach here. Hopefully the paths we use in production
// are shorter, so that we can open the socket directly, so that this hack is needed
// only in development.
let connect_result = if socket_path_len < 100 {
// We can open the path directly with no hacks.
tokio::net::UnixStream::connect(socket_path).await
} else {
// The path to the socket is too long. Create a symlink to it with a shorter path.
let short_path = std::env::temp_dir().join(format!(
"compute_ctl.short-socket.{}.{}",
std::process::id(),
tokio::task::id()
));
std::os::unix::fs::symlink(&socket_path, &short_path)?;
// Delete the symlink as soon as we have connected to it. There's a small chance
// of leaking if the process dies before we remove it, so try to keep that window
// as small as possible.
scopeguard::defer! {
if let Err(err) = std::fs::remove_file(&short_path) {
tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
short_path.display(), err);
}
}
tracing::info!(
"created symlink \"{}\" for socket \"{}\", opening it now",
short_path.display(),
socket_path.display()
);
tokio::net::UnixStream::connect(&short_path).await
};
let stream = connect_result.context("connecting to communicator control socket")?;
let io = TokioIo::new(stream);
let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(err) = connection.await {
eprintln!("Error in connection: {err}");
}
});
Ok(request_sender)
}

View File

@@ -1,12 +1,13 @@
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState, PromoteState, TlsConfig,
LfcPrewarmState, TlsConfig,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverConnectionInfo,
PageserverShardConnectionInfo, PgIdent,
};
use futures::StreamExt;
use futures::future::join_all;
@@ -29,7 +30,8 @@ use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio::task::JoinHandle;
use tokio::{spawn, time};
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::id::{TenantId, TimelineId};
@@ -74,20 +76,12 @@ const DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL: u64 = 3600;
/// Static configuration params that don't change after startup. These mostly
/// come from the CLI args, or are derived from them.
#[derive(Clone, Debug)]
pub struct ComputeNodeParams {
/// The ID of the compute
pub compute_id: String,
/// Url type maintains proper escaping
// Url type maintains proper escaping
pub connstr: url::Url,
/// The name of the 'weak' superuser role, which we give to the users.
/// It follows the allow list approach, i.e., we take a standard role
/// and grant it extra permissions with explicit GRANTs here and there,
/// and core patches.
pub privileged_role_name: String,
pub resize_swap_on_bind: bool,
pub set_disk_quota_for_fs: Option<String>,
@@ -113,13 +107,6 @@ pub struct ComputeNodeParams {
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: Arc<AtomicU64>,
/// Hadron instance ID of the compute node.
pub instance_id: Option<String>,
/// Timeout of PG compute startup in the Init state.
pub pg_init_timeout: Option<Duration>,
// Path to the `pg_isready` binary.
pub pg_isready_bin: String,
pub lakebase_mode: bool,
}
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
@@ -161,7 +148,6 @@ pub struct RemoteExtensionMetrics {
#[derive(Clone, Debug)]
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub pg_start_time: Option<DateTime<Utc>>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
@@ -190,7 +176,6 @@ pub struct ComputeState {
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
/// mode == ComputeMode::Primary. None otherwise
pub terminate_flush_lsn: Option<Lsn>,
pub promote_state: Option<watch::Receiver<PromoteState>>,
pub metrics: ComputeMetrics,
}
@@ -199,7 +184,6 @@ impl ComputeState {
pub fn new() -> Self {
Self {
start_time: Utc::now(),
pg_start_time: None,
status: ComputeStatus::Empty,
last_active: None,
error: None,
@@ -209,7 +193,6 @@ impl ComputeState {
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
promote_state: None,
}
}
@@ -242,7 +225,7 @@ pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pageserver_connstr: String,
pub pageserver_conninfo: PageserverConnectionInfo,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
/// k8s dns name and port
@@ -289,6 +272,27 @@ impl ParsedSpec {
}
}
fn extract_pageserver_conninfo_from_guc(
pageserver_connstring_guc: &str,
) -> PageserverConnectionInfo {
PageserverConnectionInfo {
shards: pageserver_connstring_guc
.split(',')
.enumerate()
.map(|(i, connstr)| {
(
i as u32,
PageserverShardConnectionInfo {
libpq_url: Some(connstr.to_string()),
grpc_url: None,
},
)
})
.collect(),
prefer_grpc: false,
}
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
@@ -298,11 +302,17 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
// For backwards-compatibility, the top-level fields in the spec file
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let pageserver_conninfo = match &spec.pageserver_connection_info {
Some(x) => x.clone(),
None => {
if let Some(guc) = spec.cluster.settings.find("neon.pageserver_connstring") {
extract_pageserver_conninfo_from_guc(&guc)
} else {
return Err("pageserver connstr should be provided".to_string());
}
}
};
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
@@ -352,7 +362,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
let res = ParsedSpec {
spec,
pageserver_connstr,
pageserver_conninfo,
safekeeper_connstrings,
storage_auth_token,
tenant_id,
@@ -442,7 +452,7 @@ impl ComputeNode {
let mut new_state = ComputeState::new();
if let Some(spec) = config.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow!(msg))?;
new_state.pspec = Some(pspec);
}
@@ -488,7 +498,6 @@ impl ComputeNode {
port: this.params.external_http_port,
config: this.compute_ctl_config.clone(),
compute_id: this.params.compute_id.clone(),
instance_id: this.params.instance_id.clone(),
}
.launch(&this);
@@ -658,9 +667,6 @@ impl ComputeNode {
};
_this_entered = start_compute_span.enter();
// Hadron: Record postgres start time (used to enforce pg_init_timeout).
state_guard.pg_start_time.replace(Utc::now());
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
compute_state = state_guard.clone()
}
@@ -977,20 +983,14 @@ impl ComputeNode {
None
};
let mut delay_exit = false;
let mut state = self.state.lock().unwrap();
state.terminate_flush_lsn = lsn;
let delay_exit = state.status == ComputeStatus::TerminationPendingFast;
if state.status == ComputeStatus::TerminationPendingFast
|| state.status == ComputeStatus::TerminationPendingImmediate
{
info!(
"Changing compute status from {} to {}",
state.status,
ComputeStatus::Terminated
);
if let ComputeStatus::TerminationPending { mode } = state.status {
state.status = ComputeStatus::Terminated;
self.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = mode == compute_api::responses::TerminateMode::Fast
}
drop(state);
@@ -1053,16 +1053,13 @@ impl ComputeNode {
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let started = Instant::now();
let (connected, size) = match PageserverProtocol::from_connstring(shard0_connstr)? {
PageserverProtocol::Libpq => self.try_get_basebackup_libpq(spec, lsn)?,
PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?,
let (connected, size) = if spec.pageserver_conninfo.prefer_grpc {
self.try_get_basebackup_grpc(spec, lsn)?
} else {
self.try_get_basebackup_libpq(spec, lsn)?
};
self.fix_zenith_signal_neon_signal()?;
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros =
connected.duration_since(started).as_micros() as u64;
@@ -1072,44 +1069,24 @@ impl ComputeNode {
Ok(())
}
/// Move the Zenith signal file to Neon signal file location.
/// This makes Compute compatible with older PageServers that don't yet
/// know about the Zenith->Neon rename.
fn fix_zenith_signal_neon_signal(&self) -> Result<()> {
let datadir = Path::new(&self.params.pgdata);
let neonsig = datadir.join("neon.signal");
if neonsig.is_file() {
return Ok(());
}
let zenithsig = datadir.join("zenith.signal");
if zenithsig.is_file() {
fs::copy(zenithsig, neonsig)?;
}
Ok(())
}
/// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when
/// the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_connstr = spec
.pageserver_connstr
.split(',')
.next()
.unwrap()
.to_string();
let shard_index = match spec.pageserver_connstr.split(',').count() as u8 {
let shard0 = spec
.pageserver_conninfo
.shards
.get(&0)
.expect("shard 0 connection info missing");
let shard0_url = shard0.grpc_url.clone().expect("no grpc_url for shard 0");
let shard_index = match spec.pageserver_conninfo.shards.len() as u8 {
0 | 1 => ShardIndex::unsharded(),
count => ShardIndex::new(ShardNumber(0), ShardCount(count)),
};
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
shard0_connstr,
shard0_url,
spec.tenant_id,
spec.timeline_id,
shard_index,
@@ -1144,8 +1121,13 @@ impl ComputeNode {
/// Fetches a basebackup via libpq. The connstring must use postgresql://. Returns the timestamp
/// when the connection was established, and the (compressed) size of the basebackup.
fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> {
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
let shard0 = spec
.pageserver_conninfo
.shards
.get(&0)
.expect("shard 0 connection info missing");
let shard0_connstr = shard0.libpq_url.clone().expect("no libpq_url for shard 0");
let mut config = postgres::Config::from_str(&shard0_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
@@ -1231,10 +1213,7 @@ impl ComputeNode {
return result;
}
Err(ref e) if attempts < max_attempts => {
warn!(
"Failed to get basebackup: {} (attempt {}/{})",
e, attempts, max_attempts
);
warn!("Failed to get basebackup: {e:?} (attempt {attempts}/{max_attempts})");
std::thread::sleep(std::time::Duration::from_millis(retry_period_ms as u64));
retry_period_ms *= 1.5;
}
@@ -1307,7 +1286,9 @@ impl ComputeNode {
// In case of error, log and fail the check, but don't crash.
// We're playing it safe because these errors could be transient
// and we don't yet retry.
// and we don't yet retry. Also being careful here allows us to
// be backwards compatible with safekeepers that don't have the
// TIMELINE_STATUS API yet.
if responses.len() < quorum {
error!(
"failed sync safekeepers check {:?} {:?} {:?}",
@@ -1410,7 +1391,6 @@ impl ComputeNode {
self.create_pgdata()?;
config::write_postgres_conf(
pgdata_path,
&self.params,
&pspec.spec,
self.params.internal_http_port,
tls_config,
@@ -1442,19 +1422,11 @@ impl ComputeNode {
}
};
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
)
})?;
self.get_basebackup(compute_state, lsn)
.with_context(|| format!("failed to get basebackup@{lsn}"))?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path, None)?;
update_pg_hba(pgdata_path)?;
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
@@ -1759,8 +1731,6 @@ impl ComputeNode {
}
// Run migrations separately to not hold up cold starts
let lakebase_mode = self.params.lakebase_mode;
let params = self.params.clone();
tokio::spawn(async move {
let mut conf = conf.as_ref().clone();
conf.application_name("compute_ctl:migrations");
@@ -1772,7 +1742,7 @@ impl ComputeNode {
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await {
if let Err(e) = handle_migrations(&mut client).await {
error!("Failed to run migrations: {}", e);
}
}
@@ -1788,34 +1758,6 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}
// Signal to the configurator to refresh the configuration by pulling a new spec from the HCC.
// Note that this merely triggers a notification on a condition variable the configurator thread
// waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and
// applies it.
pub async fn signal_refresh_configuration(&self) -> Result<()> {
let states_allowing_configuration_refresh = [
ComputeStatus::Running,
ComputeStatus::Failed,
// ComputeStatus::RefreshConfigurationPending,
];
let state = self.state.lock().expect("state lock poisoned");
if states_allowing_configuration_refresh.contains(&state.status) {
// state.status = ComputeStatus::RefreshConfigurationPending;
self.state_changed.notify_all();
Ok(())
} else if state.status == ComputeStatus::Init {
// If the compute is in Init state, we can't refresh the configuration immediately,
// but we should be able to do that soon.
Ok(())
} else {
Err(anyhow::anyhow!(
"Cannot refresh compute configuration in state {:?}",
state.status
))
}
}
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
@@ -1879,14 +1821,11 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.params.pgdata);
config::write_postgres_conf(
pgdata_path,
&self.params,
&spec,
self.params.internal_http_port,
tls_config,
)?;
self.pg_reload_conf()?;
if !spec.skip_pg_catalog_updates {
let max_concurrent_connections = spec.reconfigure_concurrency;
// Temporarily reset max_cluster_size in config
@@ -1906,9 +1845,10 @@ impl ComputeNode {
Ok(())
})?;
self.pg_reload_conf()?;
}
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
@@ -1981,8 +1921,7 @@ impl ComputeNode {
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => break 'cert_update,
// wait
@@ -2148,7 +2087,7 @@ LIMIT 100",
self.params
.remote_ext_base_url
.as_ref()
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
.ok_or(DownloadError::BadInput(anyhow!(
"Remote extensions storage is not configured",
)))?;
@@ -2344,7 +2283,7 @@ LIMIT 100",
let remote_extensions = spec
.remote_extensions
.as_ref()
.ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
.ok_or(anyhow!("Remote extensions are not configured"))?;
info!("parse shared_preload_libraries from spec.cluster.settings");
let mut libs_vec = Vec::new();
@@ -2423,22 +2362,22 @@ LIMIT 100",
/// The operation will time out after a specified duration.
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
let state = self.state.lock().unwrap();
let old_pageserver_connstr = state
let old_pageserver_conninfo = state
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr
.pageserver_conninfo
.clone();
let mut unchanged = true;
let _ = self
.state_changed
.wait_timeout_while(state, duration, |s| {
let pageserver_connstr = &s
let pageserver_conninfo = &s
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr;
unchanged = pageserver_connstr == &old_pageserver_connstr;
.pageserver_conninfo;
unchanged = pageserver_conninfo == &old_pageserver_conninfo;
unchanged
})
.unwrap();
@@ -2492,31 +2431,14 @@ LIMIT 100",
pub fn spawn_lfc_offload_task(self: &Arc<Self>, interval: Duration) {
self.terminate_lfc_offload_task();
let secs = interval.as_secs();
info!("spawning lfc offload worker with {secs}s interval");
let this = self.clone();
info!("spawning LFC offload worker with {secs}s interval");
let handle = spawn(async move {
let mut interval = time::interval(interval);
interval.tick().await; // returns immediately
loop {
interval.tick().await;
let prewarm_state = this.state.lock().unwrap().lfc_prewarm_state.clone();
// Do not offload LFC state if we are currently prewarming or any issue occurred.
// If we'd do that, we might override the LFC state in endpoint storage with some
// incomplete state. Imagine a situation:
// 1. Endpoint started with `autoprewarm: true`
// 2. While prewarming is not completed, we upload the new incomplete state
// 3. Compute gets interrupted and restarts
// 4. We start again and try to prewarm with the state from 2. instead of the previous complete state
if matches!(
prewarm_state,
LfcPrewarmState::Completed
| LfcPrewarmState::NotPrewarmed
| LfcPrewarmState::Skipped
) {
this.offload_lfc_async().await;
}
this.offload_lfc_async().await;
}
});
*self.lfc_offload_task.lock().unwrap() = Some(handle);
@@ -2533,11 +2455,19 @@ LIMIT 100",
// If the value is -1, we never suspend so set the value to default collection.
// If the value is 0, it means default, we will just continue to use the default.
if spec.suspend_timeout_seconds == -1 || spec.suspend_timeout_seconds == 0 {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}, New Timeout: {}",
spec.suspend_timeout_seconds, DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL
);
self.params.installed_extensions_collection_interval.store(
DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL,
std::sync::atomic::Ordering::SeqCst,
);
} else {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}",
spec.suspend_timeout_seconds
);
self.params.installed_extensions_collection_interval.store(
spec.suspend_timeout_seconds as u64,
std::sync::atomic::Ordering::SeqCst,
@@ -2555,7 +2485,7 @@ pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
serde_json::to_string(&extensions).expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err}"),
Err(err) => error!("could not get installed extensions: {err:?}"),
}
Ok(())
}

View File

@@ -70,7 +70,7 @@ impl ComputeNode {
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.query_one("select * from get_prewarm_info()", &[])
.await
{
Ok(row) => row,
@@ -89,8 +89,7 @@ impl ComputeNode {
self.state.lock().unwrap().lfc_offload_state.clone()
}
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
/// If there is a prewarm request ongoing, return false, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -102,24 +101,14 @@ impl ComputeNode {
let cloned = self.clone();
spawn(async move {
let state = match cloned.prewarm_impl(from_endpoint).await {
Ok(true) => LfcPrewarmState::Completed,
Ok(false) => {
info!(
"skipping LFC prewarm because LFC state is not found in endpoint storage"
);
LfcPrewarmState::Skipped
}
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
LfcPrewarmState::Failed {
error: format!("{err:#}"),
}
}
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
error!(%err);
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
error: err.to_string(),
};
cloned.state.lock().unwrap().lfc_prewarm_state = state;
});
true
}
@@ -130,25 +119,15 @@ impl ComputeNode {
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
}
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| {
bail!("prewarm configured to fail because of a failpoint")
});
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
match res.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => {
return Ok(false);
}
status => bail!("{status} querying endpoint storage"),
let status = res.status();
if status != StatusCode::OK {
bail!("{status} querying endpoint storage")
}
let mut uncompressed = Vec::new();
@@ -161,18 +140,15 @@ impl ComputeNode {
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())?;
Ok(true)
.map(|_| ())
}
/// If offload request is ongoing, return false, true otherwise
@@ -200,53 +176,40 @@ impl ComputeNode {
async fn offload_lfc_with_state_update(&self) {
crate::metrics::LFC_OFFLOADS.inc();
let Err(err) = self.offload_lfc_impl().await else {
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC state to endpoint storage");
error!(%err);
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: format!("{err:#}"),
error: err.to_string(),
};
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.get_local_cache_state()", &[])
.await
.context("querying LFC state")?;
let state = row
.try_get::<usize, Option<&[u8]>>(0)
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(());
};
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();
ZstdEncoder::new(state)
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)
.context("deserializing LFC state")
.map(ZstdEncoder::new)?
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
),
Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
Err(err) => Err(err).context("writing to endpoint storage"),
}
}

View File

@@ -1,166 +0,0 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use compute_api::spec::ComputeMode;
use itertools::Itertools;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::info;
use utils::lsn::Lsn;
impl ComputeNode {
/// Returns only when promote fails or succeeds. If a network error occurs
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let cloned = self.clone();
let promote_fn = async move || {
let Err(err) = cloned.promote_impl(cfg).await else {
return PromoteState::Completed;
};
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: format!("{err:#}"),
}
};
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move { tx.send(promote_fn().await) });
rx
};
let mut task;
// self.state is unlocked after block ends so we lock it in promote_impl
// and task.changed() is reached
{
task = self
.state
.lock()
.unwrap()
.promote_state
.get_or_insert_with(start_promotion)
.clone()
}
task.changed().await.expect("promote sender dropped");
task.borrow().clone()
}
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str());
}
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state {
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
bail!("prewarm not requested or pending")
}
LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "replica prewarm failed")
}
_ => {}
}
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let primary_lsn = cfg.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
last_wal_replay_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn {
break;
}
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if last_wal_replay_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries");
}
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
cfg.spec.safekeeper_connstrings.join(",")
);
client
.query(&safekeepers_sql, &[])
.await
.context("setting safekeepers")?;
client
.query("SELECT pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| {
bail!("promotion configured to fail because of a failpoint")
});
let row = client
.query_one("SELECT * FROM pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {
bail!("pg_promote() returned false");
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client
.query_one("SHOW transaction_read_only", &[])
.await
.context("getting transaction_read_only")?;
if row.get::<usize, &str>(0) == "on" {
bail!("replica in read only mode after promotion");
}
{
let mut state = self.state.lock().unwrap();
let spec = &mut state.pspec.as_mut().unwrap().spec;
spec.mode = ComputeMode::Primary;
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
Self::merge_spec(new_conf, existing_conf);
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()
}
/// Merge old and new Postgres conf specs to apply on secondary.
/// Change new spec's port and safekeepers since they are supplied
/// differenly
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
let mut new_conf_set: HashMap<&str, &str> = new_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.remove("neon.safekeepers");
let existing_conf_set: HashMap<&str, &str> = existing_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.insert("port", existing_conf_set["port"]);
*new_conf = new_conf_set
.iter()
.map(|(k, v)| format!("{k}={v}"))
.join("\n");
}
}

View File

@@ -9,7 +9,6 @@ use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::compute::ComputeNodeParams;
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
};
@@ -42,7 +41,6 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(
pgdata_path: &Path,
params: &ComputeNodeParams,
spec: &ComputeSpec,
extension_server_port: u16,
tls_config: &Option<TlsConfig>,
@@ -56,14 +54,55 @@ pub fn write_postgres_conf(
writeln!(file, "{conf}")?;
}
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
if let Some(conninfo) = &spec.pageserver_connection_info {
let mut libpq_urls: Option<Vec<String>> = Some(Vec::new());
let mut grpc_urls: Option<Vec<String>> = Some(Vec::new());
for shardno in 0..conninfo.shards.len() {
let info = conninfo.shards.get(&(shardno as u32)).ok_or_else(|| {
anyhow::anyhow!("shard {shardno} missing from pageserver_connection_info shard map")
})?;
if let Some(url) = &info.libpq_url {
if let Some(ref mut urls) = libpq_urls {
urls.push(url.clone());
}
} else {
libpq_urls = None
}
if let Some(url) = &info.grpc_url {
if let Some(ref mut urls) = grpc_urls {
urls.push(url.clone());
}
} else {
grpc_urls = None
}
}
if let Some(libpq_urls) = libpq_urls {
writeln!(
file,
"neon.pageserver_connstring={}",
escape_conf_value(&libpq_urls.join(","))
)?;
} else {
writeln!(file, "# no neon.pageserver_connstring")?;
}
if let Some(grpc_urls) = grpc_urls {
writeln!(
file,
"neon.pageserver_grpc_urls={}",
escape_conf_value(&grpc_urls.join(","))
)?;
} else {
writeln!(file, "# no neon.pageserver_grpc_urls")?;
}
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if !spec.safekeeper_connstrings.is_empty() {
let mut neon_safekeepers_value = String::new();
@@ -164,12 +203,6 @@ pub fn write_postgres_conf(
}
}
writeln!(
file,
"neon.privileged_role_name={}",
escape_conf_value(params.privileged_role_name.as_str())
)?;
// If there are any extra options in the 'settings' field, append those
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;

View File

@@ -1,60 +0,0 @@
use metrics::{
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
register_int_gauge_vec,
};
use once_cell::sync::Lazy;
// Counter keeping track of the number of PageStream request errors reported by Postgres.
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_pagestream_request_errors_total",
"Number of PageStream request errors reported by the postgres process"
)
.expect("failed to define a metric")
});
// Counter keeping track of the number of compute configuration errors due to Postgres statement
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
// increases by checking PG and PS logs.
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_configure_statement_timeout_errors_total",
"Number of compute configuration errors due to Postgres statement timeouts."
)
.expect("failed to define a metric")
});
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pg_cctl_attached",
"Compute node attached status (1 if attached)",
&[
"pg_compute_id",
"pg_instance_id",
"tenant_id",
"timeline_id"
]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = Vec::new();
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
metrics.extend(COMPUTE_ATTACHED.collect());
metrics
}
pub fn initialize_metrics() {
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
Lazy::force(&COMPUTE_ATTACHED);
}

View File

@@ -16,29 +16,13 @@ use crate::http::JsonResponse;
#[derive(Clone, Debug)]
pub(in crate::http) struct Authorize {
compute_id: String,
// BEGIN HADRON
// Hadron instance ID. Only set if it's a Lakebase V1 a.k.a. Hadron instance.
instance_id: Option<String>,
// END HADRON
jwks: JwkSet,
validation: Validation,
}
impl Authorize {
pub fn new(compute_id: String, instance_id: Option<String>, jwks: JwkSet) -> Self {
pub fn new(compute_id: String, jwks: JwkSet) -> Self {
let mut validation = Validation::new(Algorithm::EdDSA);
// BEGIN HADRON
let use_rsa = jwks.keys.iter().any(|jwk| {
jwk.common
.key_algorithm
.is_some_and(|alg| alg == jsonwebtoken::jwk::KeyAlgorithm::RS256)
});
if use_rsa {
validation = Validation::new(Algorithm::RS256);
}
// END HADRON
validation.validate_exp = true;
// Unused by the control plane
validation.validate_nbf = false;
@@ -50,7 +34,6 @@ impl Authorize {
Self {
compute_id,
instance_id,
jwks,
validation,
}
@@ -64,20 +47,10 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
fn authorize(&mut self, mut request: Request<Body>) -> Self::Future {
let compute_id = self.compute_id.clone();
let is_hadron_instance = self.instance_id.is_some();
let jwks = self.jwks.clone();
let validation = self.validation.clone();
Box::pin(async move {
// BEGIN HADRON
// In Hadron deployments the "external" HTTP endpoint on compute_ctl can only be
// accessed by trusted components (enforced by dblet network policy), so we can bypass
// all auth here.
if is_hadron_instance {
return Ok(request);
}
// END HADRON
let TypedHeader(Authorization(bearer)) = request
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await

View File

@@ -83,87 +83,6 @@ paths:
schema:
$ref: "#/components/schemas/DbsAndRoles"
/promote:
post:
tags:
- Promotion
summary: Promote secondary replica to primary
description: ""
operationId: promoteReplica
requestBody:
description: Promote requests data
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeSchemaWithLsn"
responses:
200:
description: Promote succeeded or wasn't started
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
500:
description: Promote failed
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
/lfc/prewarm:
post:
summary: Request LFC Prewarm
parameters:
- name: from_endpoint
in: query
schema:
type: string
description: ""
operationId: lfcPrewarm
responses:
202:
description: LFC prewarm started
429:
description: LFC prewarm ongoing
get:
tags:
- Prewarm
summary: Get LFC prewarm state
description: ""
operationId: getLfcPrewarmState
responses:
200:
description: Prewarm state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
/lfc/offload:
post:
summary: Request LFC offload
description: ""
operationId: lfcOffload
responses:
202:
description: LFC offload started
429:
description: LFC offload ongoing
get:
tags:
- Prewarm
summary: Get LFC offloading state
description: ""
operationId: getLfcOffloadState
responses:
200:
description: Offload state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcOffloadState"
/database_schema:
get:
tags:
@@ -297,7 +216,14 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeSchema"
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
responses:
200:
description: Compute configuration finished.
@@ -364,28 +290,9 @@ paths:
summary: Terminate Postgres and wait for it to exit
description: ""
operationId: terminate
parameters:
- name: mode
in: query
description: "Terminate mode: fast (wait 30s before returning) and immediate"
required: false
schema:
type: string
enum: ["fast", "immediate"]
default: fast
responses:
200:
description: Result
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
201:
description: Result if compute is already terminated
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
412:
description: "wrong state"
content:
@@ -428,6 +335,15 @@ components:
total_startup_ms:
type: integer
Info:
type: object
description: Information about VM/Pod.
required:
- num_cpus
properties:
num_cpus:
type: integer
DbsAndRoles:
type: object
description: Databases and Roles
@@ -542,14 +458,11 @@ components:
type: string
enum:
- empty
- configuration_pending
- init
- running
- configuration
- failed
- termination_pending_fast
- termination_pending_immediate
- terminated
- running
- configuration_pending
- configuration
example: running
ExtensionInstallRequest:
@@ -584,76 +497,25 @@ components:
type: string
example: "1.0.0"
ComputeSchema:
InstalledExtensions:
type: object
required:
- spec
properties:
spec:
type: object
ComputeSchemaWithLsn:
type: object
required:
- spec
- wal_flush_lsn
properties:
spec:
$ref: "#/components/schemas/ComputeState"
wal_flush_lsn:
type: string
description: "last WAL flush LSN"
example: "0/028F10D8"
LfcPrewarmState:
type: object
required:
- status
- total
- prewarmed
- skipped
properties:
status:
description: LFC prewarm status
enum: [not_prewarmed, prewarming, completed, failed, skipped]
type: string
error:
description: LFC prewarm error, if any
type: string
total:
description: Total pages processed
type: integer
prewarmed:
description: Total pages prewarmed
type: integer
skipped:
description: Pages processed but not prewarmed
type: integer
LfcOffloadState:
type: object
required:
- status
properties:
status:
description: LFC offload status
enum: [not_offloaded, offloading, completed, failed]
type: string
error:
description: LFC offload error, if any
type: string
PromoteState:
type: object
required:
- status
properties:
status:
description: Promote result
enum: [not_promoted, completed, failed]
type: string
error:
description: Promote error, if any
type: string
extensions:
description: Contains list of installed extensions.
type: array
items:
type: object
properties:
extname:
type: string
version:
type: string
items:
type: string
n_databases:
type: integer
owned_by_superuser:
type: integer
SetRoleGrantsRequest:
type: object
@@ -682,17 +544,6 @@ components:
description: Role name.
example: "neon"
TerminateResponse:
type: object
required:
- lsn
properties:
lsn:
type: string
nullable: true
description: "last WAL flush LSN"
example: "0/028F10D8"
SetRoleGrantsResponse:
type: object
required:

View File

@@ -1,34 +0,0 @@
use crate::pg_isready::pg_isready;
use crate::{compute::ComputeNode, http::JsonResponse};
use axum::{extract::State, http::StatusCode, response::Response};
use std::sync::Arc;
/// NOTE: NOT ENABLED YET
/// Detect if the compute is alive.
/// Called by the liveness probe of the compute container.
pub(in crate::http) async fn hadron_liveness_probe(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
let port = match compute.params.connstr.port() {
Some(port) => port,
None => {
return JsonResponse::error(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get the port from the connection string",
);
}
};
match pg_isready(&compute.params.pg_isready_bin, port) {
Ok(_) => {
// The connection is successful, so the compute is alive.
// Return a 200 OK response.
JsonResponse::success(StatusCode::OK, "ok")
}
Err(e) => {
tracing::error!("Hadron liveness probe failed: {}", e);
// The connection failed, so the compute is not alive.
// Return a 500 Internal Server Error response.
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}
}

View File

@@ -1,18 +1,10 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use http_body_util::BodyExt;
use hyper::{Request, StatusCode};
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use crate::communicator_socket_client::connect_communicator_socket;
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::metrics::collect;
@@ -39,42 +31,3 @@ pub(in crate::http) async fn get_metrics() -> Response {
.body(Body::from(buffer))
.unwrap()
}
/// Fetch and forward metrics from the Postgres neon extension's metrics
/// exporter that are used by autoscaling-agent.
///
/// The neon extension exposes these metrics over a Unix domain socket
/// in the data directory. That's not accessible directly from the outside
/// world, so we have this endpoint in compute_ctl to expose it
pub(in crate::http) async fn get_autoscaling_metrics(
State(compute): State<Arc<ComputeNode>>,
) -> Result<Response, Response> {
let pgdata = Path::new(&compute.params.pgdata);
// Connect to the communicator process's metrics socket
let mut metrics_client = connect_communicator_socket(pgdata)
.await
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Make a request for /autoscaling_metrics
let request = Request::builder()
.method("GET")
.uri("/autoscaling_metrics")
.header("Host", "localhost") // hyper requires Host, even though the server won't care
.body(Body::from(""))
.unwrap();
let resp = metrics_client
.send_request(request)
.await
.context("fetching metrics from Postgres metrics service")
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Build a response that just forwards the response we got.
let mut response = Response::builder();
response = response.status(resp.status());
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
response = response.header(CONTENT_TYPE, content_type);
}
let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
Ok(response.body(body).unwrap())
}

View File

@@ -10,13 +10,10 @@ pub(in crate::http) mod extension_server;
pub(in crate::http) mod extensions;
pub(in crate::http) mod failpoints;
pub(in crate::http) mod grants;
pub(in crate::http) mod hadron_liveness_probe;
pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod refresh_configuration;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -1,14 +0,0 @@
use crate::http::JsonResponse;
use axum::extract::Json;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Json(cfg): Json<compute_api::responses::PromoteConfig>,
) -> axum::response::Response {
let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -1,34 +0,0 @@
// This file is added by Hadron
use std::sync::Arc;
use axum::{
extract::State,
response::{IntoResponse, Response},
};
use http::StatusCode;
use tracing::debug;
use crate::compute::ComputeNode;
// use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS;
use crate::http::JsonResponse;
// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec
// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait
// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause
// configuration to be reloaded in a best effort manner. Invocation of this method does not
// guarantee that a reconfiguration will occur. The caller should consider keep sending this
// request while it believes that the compute configuration is out of date.
pub(in crate::http) async fn refresh_configuration(
State(compute): State<Arc<ComputeNode>>,
) -> Response {
debug!("serving /refresh_configuration POST request");
// POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc();
match compute.signal_refresh_configuration().await {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => {
tracing::error!("error handling /refresh_configuration request: {}", e);
JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e)
}
}
}

View File

@@ -3,7 +3,7 @@ use crate::http::JsonResponse;
use axum::extract::State;
use axum::response::Response;
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse};
use compute_api::responses::{ComputeStatus, TerminateResponse};
use http::StatusCode;
use serde::Deserialize;
use std::sync::Arc;
@@ -12,7 +12,7 @@ use tracing::info;
#[derive(Deserialize, Default)]
pub struct TerminateQuery {
mode: TerminateMode,
mode: compute_api::responses::TerminateMode,
}
/// Terminate the compute.
@@ -24,16 +24,16 @@ pub(in crate::http) async fn terminate(
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
let response = TerminateResponse {
lsn: state.terminate_flush_lsn,
};
return JsonResponse::success(StatusCode::CREATED, response);
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn);
}
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
state.set_status(mode.into(), &compute.state_changed);
state.set_status(
ComputeStatus::TerminationPending { mode },
&compute.state_changed,
);
}
forward_termination_signal(false);

View File

@@ -23,11 +23,10 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, lfc, metrics, metrics_json, promote, status, terminate,
grants, insights, lfc, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
use crate::http::routes::{hadron_liveness_probe, refresh_configuration};
/// `compute_ctl` has two servers: internal and external. The internal server
/// binds to the loopback interface and handles communication from clients on
@@ -44,7 +43,6 @@ pub enum Server {
port: u16,
config: ComputeCtlConfig,
compute_id: String,
instance_id: Option<String>,
},
}
@@ -69,12 +67,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
post(extension_server::download_extension),
)
.route("/extensions", post(extensions::install_extension))
.route("/grants", post(grants::add_grant))
// Hadron: Compute-initiated configuration refresh
.route(
"/refresh_configuration",
post(refresh_configuration::refresh_configuration),
);
.route("/grants", post(grants::add_grant));
// Add in any testing support
if cfg!(feature = "testing") {
@@ -86,22 +79,14 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
router
}
Server::External {
config,
compute_id,
instance_id,
..
config, compute_id, ..
} => {
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
.route(
"/autoscaling_metrics",
get(metrics::get_autoscaling_metrics),
);
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))
@@ -110,13 +95,8 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
.route("/metrics.json", get(metrics_json::get_metrics))
.route("/status", get(status::get_status))
.route("/terminate", post(terminate::terminate))
.route(
"/hadron_liveness_probe",
get(hadron_liveness_probe::hadron_liveness_probe),
)
.layer(AsyncRequireAuthorizationLayer::new(Authorize::new(
compute_id.clone(),
instance_id.clone(),
config.jwks.clone(),
)));

View File

@@ -2,8 +2,6 @@ use std::collections::HashMap;
use anyhow::Result;
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use once_cell::sync::Lazy;
use tokio_postgres::error::Error as PostgresError;
use tokio_postgres::{Client, Config, NoTls};
use crate::metrics::INSTALLED_EXTENSIONS;
@@ -12,7 +10,7 @@ use crate::metrics::INSTALLED_EXTENSIONS;
/// and to make database listing query here more explicit.
///
/// Limit the number of databases to 500 to avoid excessive load.
async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
async fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
// `pg_database.datconnlimit = -2` means that the database is in the
// invalid state
let databases = client
@@ -39,9 +37,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
/// Same extension can be installed in multiple databases with different versions,
/// so we report a separate metric (number of databases where it is installed)
/// for each extension version.
pub async fn get_installed_extensions(
mut conf: Config,
) -> Result<InstalledExtensions, PostgresError> {
pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExtensions> {
conf.application_name("compute_ctl:get_installed_extensions");
let databases: Vec<String> = {
let (mut client, connection) = conf.connect(NoTls).await?;
@@ -120,7 +116,3 @@ pub async fn get_installed_extensions(
extensions: extensions_map.into_values().collect(),
})
}
pub fn initialize_metrics() {
Lazy::force(&INSTALLED_EXTENSIONS);
}

View File

@@ -4,7 +4,6 @@
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod checker;
pub mod communicator_socket_client;
pub mod config;
pub mod configurator;
pub mod http;
@@ -13,10 +12,8 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod hadron_metrics;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
@@ -25,7 +22,6 @@ mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pg_isready;
pub mod pgbouncer;
pub mod rsyslog;
pub mod spec;

View File

@@ -1,10 +1,7 @@
use std::collections::HashMap;
use std::sync::{LazyLock, RwLock};
use tracing::Subscriber;
use tracing::info;
use tracing_appender;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
/// Initialize logging to stderr, and OpenTelemetry tracing and exporter.
///
@@ -16,63 +13,31 @@ use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan};
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
/// `tracing-utils` package description.
///
pub fn init_tracing_and_logging(
default_log_level: &str,
log_dir_opt: &Option<String>,
) -> anyhow::Result<(
Option<tracing_utils::Provider>,
Option<tracing_appender::non_blocking::WorkerGuard>,
)> {
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
// Initialize Logging
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));
// Standard output streams
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.with_writer(std::io::stderr);
// Logs with file rotation. Files in `$log_dir/pgcctl.yyyy-MM-dd`
let (json_to_file_layer, _file_logs_guard) = if let Some(log_dir) = log_dir_opt {
std::fs::create_dir_all(log_dir)?;
let file_logs_appender = tracing_appender::rolling::RollingFileAppender::builder()
.rotation(tracing_appender::rolling::Rotation::DAILY)
.filename_prefix("pgcctl")
// Lib appends to existing files, so we will keep files for up to 2 days even on restart loops.
// At minimum, log-daemon will have 1 day to detect and upload a file (if created right before midnight).
.max_log_files(2)
.build(log_dir)
.expect("Initializing rolling file appender should succeed");
let (file_logs_writer, _file_logs_guard) =
tracing_appender::non_blocking(file_logs_appender);
let json_to_file_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(file_logs_writer);
(Some(json_to_file_layer), Some(_file_logs_guard))
} else {
(None, None)
};
// Initialize OpenTelemetry
let provider =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default());
let otlp_layer = provider.as_ref().map(tracing_utils::layer);
let otlp_layer =
tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await;
// Put it all together
tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.with(json_to_file_layer)
.init();
tracing::info!("logging and tracing started");
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
Ok((provider, _file_logs_guard))
Ok(())
}
/// Replace all newline characters with a special character to make it
@@ -127,157 +92,3 @@ pub fn startup_context_from_env() -> Option<opentelemetry::Context> {
None
}
}
/// Track relevant id's
const UNKNOWN_IDS: &str = r#""pg_instance_id": "", "pg_compute_id": """#;
static IDS: LazyLock<RwLock<String>> = LazyLock::new(|| RwLock::new(UNKNOWN_IDS.to_string()));
pub fn update_ids(instance_id: &Option<String>, compute_id: &Option<String>) -> anyhow::Result<()> {
let ids = format!(
r#""pg_instance_id": "{}", "pg_compute_id": "{}""#,
instance_id.as_ref().map(|s| s.as_str()).unwrap_or_default(),
compute_id.as_ref().map(|s| s.as_str()).unwrap_or_default()
);
let mut guard = IDS
.write()
.map_err(|e| anyhow::anyhow!("Log set id's rwlock poisoned: {}", e))?;
*guard = ids;
Ok(())
}
/// Massage compute_ctl logs into PG json log shape so we can use the same Lumberjack setup.
struct PgJsonLogShapeFormatter;
impl<S, N> fmt::format::FormatEvent<S, N> for PgJsonLogShapeFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> fmt::format::FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &fmt::FmtContext<'_, S, N>,
mut writer: fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
// Format values from the event's metadata, and open message string
let metadata = event.metadata();
{
let ids_guard = IDS.read();
let ids = ids_guard
.as_ref()
.map(|guard| guard.as_str())
// Surpress so that we don't lose all uploaded/ file logs if something goes super wrong. We would notice the missing id's.
.unwrap_or(UNKNOWN_IDS);
write!(
&mut writer,
r#"{{"timestamp": "{}", "error_severity": "{}", "file_name": "{}", "backend_type": "compute_ctl_self", {}, "message": "#,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f GMT"),
metadata.level(),
metadata.target(),
ids
)?;
}
let mut message = String::new();
let message_writer = fmt::format::Writer::new(&mut message);
// Gather the message
ctx.field_format().format_fields(message_writer, event)?;
// TODO: any better options than to copy-paste this OSS span formatter?
// impl<S, N, T> FormatEvent<S, N> for Format<Full, T>
// https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.FormatEvent.html#impl-FormatEvent%3CS,+N%3E-for-Format%3CFull,+T%3E
// write message, close bracket, and new line
writeln!(writer, "{}}}", serde_json::to_string(&message).unwrap())
}
}
#[cfg(feature = "testing")]
#[cfg(test)]
mod test {
use super::*;
use std::{cell::RefCell, io};
// Use thread_local! instead of Mutex for test isolation
thread_local! {
static WRITER_OUTPUT: RefCell<String> = const { RefCell::new(String::new()) };
}
#[derive(Clone, Default)]
struct StaticStringWriter;
impl io::Write for StaticStringWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let output = String::from_utf8(buf.to_vec()).expect("Invalid UTF-8 in test output");
WRITER_OUTPUT.with(|s| s.borrow_mut().push_str(&output));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl fmt::MakeWriter<'_> for StaticStringWriter {
type Writer = Self;
fn make_writer(&self) -> Self::Writer {
Self
}
}
#[test]
fn test_log_pg_json_shape_formatter() {
// Use a scoped subscriber to prevent global state pollution
let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_target(false)
.event_format(PgJsonLogShapeFormatter)
.with_writer(StaticStringWriter),
);
let _ = update_ids(&Some("000".to_string()), &Some("111".to_string()));
// Clear any previous test state
WRITER_OUTPUT.with(|s| s.borrow_mut().clear());
let messages = [
"test message",
r#"json escape check: name="BatchSpanProcessor.Flush.ExportError" reason="Other(reqwest::Error { kind: Request, url: \"http://localhost:4318/v1/traces\", source: hyper_
util::client::legacy::Error(Connect, ConnectError(\"tcp connect error\", Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" })) })" Failed during the export process"#,
];
tracing::subscriber::with_default(subscriber, || {
for message in messages {
tracing::info!(message);
}
});
tracing::info!("not test message");
// Get captured output
let output = WRITER_OUTPUT.with(|s| s.borrow().clone());
let json_strings: Vec<&str> = output.lines().collect();
assert_eq!(
json_strings.len(),
messages.len(),
"Log didn't have the expected number of json strings."
);
let json_string_shape_regex = regex::Regex::new(
r#"\{"timestamp": "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} GMT", "error_severity": "INFO", "file_name": ".+", "backend_type": "compute_ctl_self", "pg_instance_id": "000", "pg_compute_id": "111", "message": ".+"\}"#
).unwrap();
for (i, expected_message) in messages.iter().enumerate() {
let json_string = json_strings[i];
assert!(
json_string_shape_regex.is_match(json_string),
"Json log didn't match expected pattern:\n{json_string}",
);
let parsed_json: serde_json::Value = serde_json::from_str(json_string).unwrap();
let actual_message = parsed_json["message"].as_str().unwrap();
assert_eq!(*expected_message, actual_message);
}
}
}

View File

@@ -4,8 +4,7 @@ use std::thread;
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail};
use compute_api::spec::{ComputeMode, PageserverProtocol};
use itertools::Itertools as _;
use compute_api::spec::{ComputeMode, PageserverConnectionInfo};
use pageserver_page_api as page_api;
use postgres::{NoTls, SimpleQueryMessage};
use tracing::{info, warn};
@@ -78,17 +77,16 @@ fn acquire_lsn_lease_with_retry(
loop {
// Note: List of pageservers is dynamic, need to re-read configs before each attempt.
let (connstrings, auth) = {
let (conninfo, auth) = {
let state = compute.state.lock().unwrap();
let spec = state.pspec.as_ref().expect("spec must be set");
(
spec.pageserver_connstr.clone(),
spec.pageserver_conninfo.clone(),
spec.storage_auth_token.clone(),
)
};
let result =
try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
let result = try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn);
match result {
Ok(Some(res)) => {
return Ok(res);
@@ -112,17 +110,16 @@ fn acquire_lsn_lease_with_retry(
/// Tries to acquire LSN leases on all Pageserver shards.
fn try_acquire_lsn_lease(
connstrings: &str,
conninfo: PageserverConnectionInfo,
auth: Option<&str>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Result<Option<SystemTime>> {
let connstrings = connstrings.split(',').collect_vec();
let shard_count = connstrings.len();
let shard_count = conninfo.shards.len();
let mut leases = Vec::new();
for (shard_number, &connstring) in connstrings.iter().enumerate() {
for (shard_number, shard) in conninfo.shards.into_iter() {
let tenant_shard_id = match shard_count {
0 | 1 => TenantShardId::unsharded(tenant_id),
shard_count => TenantShardId {
@@ -132,13 +129,22 @@ fn try_acquire_lsn_lease(
},
};
let lease = match PageserverProtocol::from_connstring(connstring)? {
PageserverProtocol::Libpq => {
acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
PageserverProtocol::Grpc => {
acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
}
let lease = if conninfo.prefer_grpc {
acquire_lsn_lease_grpc(
&shard.grpc_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?
} else {
acquire_lsn_lease_libpq(
&shard.libpq_url.unwrap(),
auth,
tenant_shard_id,
timeline_id,
lsn,
)?
};
leases.push(lease);
}

View File

@@ -105,14 +105,6 @@ pub(crate) static LFC_PREWARMS: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static LFC_PREWARM_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarm_errors_total",
"Total number of LFC prewarm errors",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offloads_total",
@@ -121,14 +113,6 @@ pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOAD_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offload_errors_total",
"Total number of LFC offload errors",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -139,8 +123,6 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics.extend(LFC_PREWARMS.collect());
metrics.extend(LFC_PREWARM_ERRORS.collect());
metrics.extend(LFC_OFFLOADS.collect());
metrics.extend(LFC_OFFLOAD_ERRORS.collect());
metrics
}

View File

@@ -9,20 +9,15 @@ use crate::metrics::DB_MIGRATION_FAILED;
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
lakebase_mode: bool,
}
impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
Self {
client,
migrations,
lakebase_mode,
}
Self { client, migrations }
}
/// Get the current value neon_migration.migration_id
@@ -135,13 +130,8 @@ impl<'m> MigrationRunner<'m> {
// ID is also the next index
let migration_id = (current_migration + 1) as i64;
let migration = self.migrations[current_migration];
let migration = if self.lakebase_mode {
migration.replace("neon_superuser", "databricks_superuser")
} else {
migration.to_string()
};
match Self::run_migration(self.client, migration_id, &migration).await {
match Self::run_migration(self.client, migration_id, migration).await {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}

View File

@@ -1 +0,0 @@
ALTER ROLE {privileged_role_name} BYPASSRLS;

View File

@@ -0,0 +1 @@
ALTER ROLE neon_superuser BYPASSRLS;

View File

@@ -1,21 +1,8 @@
-- On December 8th, 2023, an engineering escalation (INC-110) was opened after
-- it was found that BYPASSRLS was being applied to all roles.
--
-- PR that introduced the issue: https://github.com/neondatabase/neon/pull/5657
-- Subsequent commit on main: https://github.com/neondatabase/neon/commit/ad99fa5f0393e2679e5323df653c508ffa0ac072
--
-- NOBYPASSRLS and INHERIT are the defaults for a Postgres role, but because it
-- isn't easy to know if a Postgres cluster is affected by the issue, we need to
-- keep the migration around for a long time, if not indefinitely, so any
-- cluster can be fixed.
--
-- Branching is the gift that keeps on giving...
DO $$
DECLARE
role_name text;
BEGIN
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, '{privileged_role_name}', 'member')
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
@@ -23,7 +10,7 @@ BEGIN
FOR role_name IN SELECT rolname FROM pg_roles
WHERE
NOT pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT starts_with(rolname, 'pg_')
NOT pg_has_role(rolname, 'neon_superuser', 'member') AND NOT starts_with(rolname, 'pg_')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT pg_create_subscription TO {privileged_role_name}';
EXECUTE 'GRANT pg_create_subscription TO neon_superuser';
END IF;
END $$;

View File

@@ -0,0 +1 @@
GRANT pg_monitor TO neon_superuser WITH ADMIN OPTION;

View File

@@ -1 +0,0 @@
GRANT pg_monitor TO {privileged_role_name} WITH ADMIN OPTION;

View File

@@ -1,4 +1,4 @@
-- SKIP: Deemed insufficient for allowing relations created by extensions to be
-- interacted with by {privileged_role_name} without permission issues.
-- interacted with by neon_superuser without permission issues.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO {privileged_role_name};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser;

View File

@@ -1,4 +1,4 @@
-- SKIP: Deemed insufficient for allowing relations created by extensions to be
-- interacted with by {privileged_role_name} without permission issues.
-- interacted with by neon_superuser without permission issues.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO {privileged_role_name};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser;

View File

@@ -1,3 +1,3 @@
-- SKIP: Moved inline to the handle_grants() functions.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO {privileged_role_name} WITH GRANT OPTION;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;

View File

@@ -1,3 +1,3 @@
-- SKIP: Moved inline to the handle_grants() functions.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO {privileged_role_name} WITH GRANT OPTION;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser WITH GRANT OPTION;

View File

@@ -1,7 +1,7 @@
DO $$
BEGIN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO {privileged_role_name}';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO {privileged_role_name}';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO neon_superuser';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO neon_superuser';
END IF;
END $$;

View File

@@ -0,0 +1 @@
GRANT EXECUTE ON FUNCTION pg_show_replication_origin_status TO neon_superuser;

View File

@@ -1 +0,0 @@
GRANT EXECUTE ON FUNCTION pg_show_replication_origin_status TO {privileged_role_name};

View File

@@ -1 +0,0 @@
GRANT pg_signal_backend TO {privileged_role_name} WITH ADMIN OPTION;

View File

@@ -7,17 +7,13 @@ BEGIN
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'neon_superuser'::regrole;
AND member = 'pg_monitor'::regrole;
IF monitor IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';
END IF;
IF monitor.admin IS NULL OR NOT monitor.member THEN
IF NOT monitor.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
END IF;
IF monitor.admin IS NULL OR NOT monitor.admin THEN
IF NOT monitor.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
END IF;
END $$;

View File

@@ -1,23 +0,0 @@
DO $$
DECLARE
signal_backend record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
admin_option AS admin
INTO signal_backend
FROM pg_auth_members
WHERE roleid = 'pg_signal_backend'::regrole
AND member = 'neon_superuser'::regrole;
IF signal_backend IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_signal_backend';
END IF;
IF signal_backend.member IS NULL OR NOT signal_backend.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_signal_backend';
END IF;
IF signal_backend.admin IS NULL OR NOT signal_backend.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_signal_backend';
END IF;
END $$;

View File

@@ -11,7 +11,6 @@ use tracing::{Level, error, info, instrument, span};
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const PG_DEFAULT_INIT_TIMEOUIT: Duration = Duration::from_secs(60);
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
/// Struct to store runtime state of the compute monitor thread.
@@ -85,8 +84,7 @@ impl ComputeMonitor {
if matches!(
compute_status,
ComputeStatus::Terminated
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Failed
) {
info!(
@@ -353,47 +351,13 @@ impl ComputeMonitor {
// Hang on condition variable waiting until the compute status is `Running`.
fn wait_for_postgres_start(compute: &ComputeNode) {
let mut state = compute.state.lock().unwrap();
let pg_init_timeout = compute
.params
.pg_init_timeout
.unwrap_or(PG_DEFAULT_INIT_TIMEOUIT);
while state.status != ComputeStatus::Running {
info!("compute is not running, waiting before monitoring activity");
if !compute.params.lakebase_mode {
state = compute.state_changed.wait(state).unwrap();
state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::Running {
break;
}
continue;
if state.status == ComputeStatus::Running {
break;
}
if state.pg_start_time.is_some()
&& Utc::now()
.signed_duration_since(state.pg_start_time.unwrap())
.to_std()
.unwrap_or_default()
> pg_init_timeout
{
// If Postgres isn't up and running with working PS/SK connections within POSTGRES_STARTUP_TIMEOUT, it is
// possible that we started Postgres with a wrong spec (so it is talking to the wrong PS/SK nodes). To prevent
// deadends we simply exit (panic) the compute node so it can restart with the latest spec.
//
// NB: We skip this check if we have not attempted to start PG yet (indicated by state.pg_start_up == None).
// This is to make sure the more appropriate errors are surfaced if we encounter issues before we even attempt
// to start PG (e.g., if we can't pull the spec, can't sync safekeepers, or can't get the basebackup).
error!(
"compute did not enter Running state in {} seconds, exiting",
pg_init_timeout.as_secs()
);
std::process::exit(1);
}
state = compute
.state_changed
.wait_timeout(state, Duration::from_secs(5))
.unwrap()
.0;
}
}

View File

@@ -11,9 +11,7 @@ use std::time::{Duration, Instant};
use anyhow::{Result, bail};
use compute_api::responses::TlsConfig;
use compute_api::spec::{
Database, DatabricksSettings, GenericOption, GenericOptions, PgIdent, Role,
};
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
use futures::StreamExt;
use indexmap::IndexMap;
use ini::Ini;
@@ -186,42 +184,6 @@ impl DatabaseExt for Database {
}
}
pub trait DatabricksSettingsExt {
fn as_pg_settings(&self) -> String;
}
impl DatabricksSettingsExt for DatabricksSettings {
fn as_pg_settings(&self) -> String {
// Postgres GUCs rendered from DatabricksSettings
vec![
// ssl_ca_file
Some(format!(
"ssl_ca_file = '{}'",
self.pg_compute_tls_settings.ca_file
)),
// [Optional] databricks.workspace_url
Some(format!(
"databricks.workspace_url = '{}'",
&self.databricks_workspace_host
)),
// todo(vikas.jain): these are not required anymore as they are moved to static
// conf but keeping these to avoid image mismatch between hcc and pg.
// Once hcc and pg are in sync, we can remove these.
//
// databricks.enable_databricks_identity_login
Some("databricks.enable_databricks_identity_login = true".to_string()),
// databricks.enable_sql_restrictions
Some("databricks.enable_sql_restrictions = true".to_string()),
]
.into_iter()
// Removes `None`s
.flatten()
.collect::<Vec<String>>()
.join("\n")
+ "\n"
}
}
/// Generic trait used to provide quoting / encoding for strings used in the
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {

View File

@@ -1,30 +0,0 @@
use anyhow::{Context, anyhow};
// Run `/usr/local/bin/pg_isready -p {port}`
// Check the connectivity of PG
// Success means PG is listening on the port and accepting connections
// Note that PG does not need to authenticate the connection, nor reserve a connection quota for it.
// See https://www.postgresql.org/docs/current/app-pg-isready.html
pub fn pg_isready(bin: &str, port: u16) -> anyhow::Result<()> {
let child_result = std::process::Command::new(bin)
.arg("-p")
.arg(port.to_string())
.spawn();
child_result
.context("spawn() failed")
.and_then(|mut child| child.wait().context("wait() failed"))
.and_then(|status| match status.success() {
true => Ok(()),
false => Err(anyhow!("process exited with {status}")),
})
// wrap any prior error with the overall context that we couldn't run the command
.with_context(|| format!("could not run `{bin} --port {port}`"))
}
// It's safe to assume pg_isready is under the same directory with postgres,
// because it is a PG util bin installed along with postgres
pub fn get_pg_isready_bin(pgbin: &str) -> String {
let split = pgbin.split("/").collect::<Vec<&str>>();
split[0..split.len() - 1].join("/") + "/pg_isready"
}

View File

@@ -1,6 +1,4 @@
use std::fs::File;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use anyhow::{Result, anyhow, bail};
@@ -11,7 +9,6 @@ use reqwest::StatusCode;
use tokio_postgres::Client;
use tracing::{error, info, instrument};
use crate::compute::ComputeNodeParams;
use crate::config;
use crate::metrics::{CPLANE_REQUESTS_TOTAL, CPlaneRequestRPC, UNKNOWN_HTTP_STATUS};
use crate::migration::MigrationRunner;
@@ -135,25 +132,10 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
}
/// Check `pg_hba.conf` and update if needed to allow external connections.
pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) -> Result<()> {
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of config.json
let pghba_path = pgdata_path.join("pg_hba.conf");
// Update pg_hba to contains databricks specfic settings before adding neon settings
// PG uses the first record that matches to perform authentication, so we need to have
// our rules before the default ones from neon.
// See https://www.postgresql.org/docs/16/auth-pg-hba-conf.html
if let Some(databricks_pg_hba) = databricks_pg_hba {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_hba),
)? {
info!("updated pg_hba.conf to include databricks_pg_hba.conf");
} else {
info!("pg_hba.conf already included databricks_pg_hba.conf");
}
}
if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
info!("updated pg_hba.conf to allow external connections");
} else {
@@ -163,59 +145,6 @@ pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) ->
Ok(())
}
/// Check `pg_ident.conf` and update if needed to allow databricks config.
pub fn update_pg_ident(pgdata_path: &Path, databricks_pg_ident: Option<&String>) -> Result<()> {
info!("checking pg_ident.conf");
let pghba_path = pgdata_path.join("pg_ident.conf");
// Update pg_ident to contains databricks specfic settings
if let Some(databricks_pg_ident) = databricks_pg_ident {
if config::line_in_file(
&pghba_path,
&format!("include_if_exists {}\n", *databricks_pg_ident),
)? {
info!("updated pg_ident.conf to include databricks_pg_ident.conf");
} else {
info!("pg_ident.conf already included databricks_pg_ident.conf");
}
}
Ok(())
}
/// Copy tls key_file and cert_file from k8s secret mount directory
/// to pgdata and set private key file permissions as expected by Postgres.
/// See this doc for expected permission <https://www.postgresql.org/docs/current/ssl-tcp.html>
/// K8s secrets mount on dblet does not honor permission and ownership
/// specified in the Volume or VolumeMount. So we need to explicitly copy the file and set the permissions.
pub fn copy_tls_certificates(
key_file: &String,
cert_file: &String,
pgdata_path: &Path,
) -> Result<()> {
let files = [cert_file, key_file];
for file in files.iter() {
let source = Path::new(file);
let dest = pgdata_path.join(source.file_name().unwrap());
if !dest.exists() {
std::fs::copy(source, &dest)?;
info!(
"Copying tls file: {} to {}",
&source.display(),
&dest.display()
);
}
if *file == key_file {
// Postgres requires private key to be readable only by the owner by having
// chmod 600 permissions.
let permissions = Permissions::from_mode(0o600);
fs::set_permissions(&dest, permissions)?;
info!("Setting permission on {}.", &dest.display());
}
}
Ok(())
}
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of config.json
@@ -240,11 +169,7 @@ pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
}
#[instrument(skip_all)]
pub async fn handle_migrations(
params: ComputeNodeParams,
client: &mut Client,
lakebase_mode: bool,
) -> Result<()> {
pub async fn handle_migrations(client: &mut Client) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -253,62 +178,28 @@ pub async fn handle_migrations(
// Add new migrations in numerical order.
let migrations = [
&format!(
include_str!("./migrations/0001-add_bypass_rls_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
include_str!("./migrations/0002-alter_roles.sql"),
include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
include_str!(
"./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
),
&format!(
include_str!("./migrations/0002-alter_roles.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0003-grant_pg_create_subscription_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0004-grant_pg_monitor_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0005-grant_all_on_tables_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0006-grant_all_on_sequences_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!(
"./migrations/0007-grant_all_on_tables_with_grant_option_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!(
"./migrations/0008-grant_all_on_sequences_with_grant_option_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
include_str!(
"./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
),
include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
&format!(
include_str!(
"./migrations/0010-grant_snapshot_synchronization_funcs_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
include_str!(
"./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
),
&format!(
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_privileged_role.sql"
),
privileged_role_name = params.privileged_role_name
),
&format!(
include_str!("./migrations/0012-grant_pg_signal_backend_to_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
),
];
MigrationRunner::new(client, &migrations, lakebase_mode)
MigrationRunner::new(client, &migrations)
.run_migrations()
.await?;

View File

@@ -13,14 +13,14 @@ use tokio_postgres::Client;
use tokio_postgres::error::SqlState;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use crate::compute::{ComputeNode, ComputeNodeParams, ComputeState};
use crate::compute::{ComputeNode, ComputeState};
use crate::pg_helpers::{
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, get_existing_dbs_async,
get_existing_roles_async,
};
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreatePgauditExtension,
CreatePgauditlogtofileExtension, CreatePrivilegedRole, CreateSchemaNeon,
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateNeonSuperuser,
CreatePgauditExtension, CreatePgauditlogtofileExtension, CreateSchemaNeon,
DisablePostgresDBPgAudit, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
@@ -49,7 +49,6 @@ impl ComputeNode {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let params = Arc::new(self.params.clone());
let databases = get_existing_dbs_async(&client).await?;
let roles = get_existing_roles_async(&client)
@@ -158,7 +157,6 @@ impl ComputeNode {
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
params.clone(),
spec.clone(),
conf,
ctx.clone(),
@@ -187,7 +185,7 @@ impl ComputeNode {
}
for phase in [
CreatePrivilegedRole,
CreateNeonSuperuser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
@@ -197,7 +195,6 @@ impl ComputeNode {
] {
info!("Applying phase {:?}", &phase);
apply_operations(
params.clone(),
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
@@ -246,7 +243,6 @@ impl ComputeNode {
}
let fut = Self::apply_spec_sql_db(
params.clone(),
spec.clone(),
conf,
ctx.clone(),
@@ -297,7 +293,6 @@ impl ComputeNode {
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
params.clone(),
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
@@ -318,9 +313,7 @@ impl ComputeNode {
/// May opt to not connect to databases that don't have any scheduled
/// operations. The function is concurrency-controlled with the provided
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
#[allow(clippy::too_many_arguments)] // TODO: needs bigger refactoring
async fn apply_spec_sql_db(
params: Arc<ComputeNodeParams>,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
@@ -335,7 +328,6 @@ impl ComputeNode {
for subphase in subphases {
apply_operations(
params.clone(),
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
@@ -411,8 +403,7 @@ impl ComputeNode {
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30..300 => limit / 3,
300.. => 100,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
@@ -476,7 +467,7 @@ pub enum PerDatabasePhase {
#[derive(Clone, Debug)]
pub enum ApplySpecPhase {
CreatePrivilegedRole,
CreateNeonSuperuser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
@@ -519,7 +510,6 @@ pub struct MutableApplyContext {
/// - No timeouts have (yet) been implemented.
/// - The caller is responsible for limiting and/or applying concurrency.
pub async fn apply_operations<'a, Fut, F>(
params: Arc<ComputeNodeParams>,
spec: Arc<ComputeSpec>,
ctx: Arc<RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
@@ -537,7 +527,7 @@ where
debug!("Processing phase {:?}", &apply_spec_phase);
let ctx = ctx;
let mut ops = get_operations(&params, &spec, &ctx, &jwks_roles, &apply_spec_phase)
let mut ops = get_operations(&spec, &ctx, &jwks_roles, &apply_spec_phase)
.await?
.peekable();
@@ -598,18 +588,14 @@ where
/// sort/merge/batch execution, but for now this is a nice way to improve
/// batching behavior of the commands.
async fn get_operations<'a>(
params: &'a ComputeNodeParams,
spec: &'a ComputeSpec,
ctx: &'a RwLock<MutableApplyContext>,
jwks_roles: &'a HashSet<String>,
apply_spec_phase: &'a ApplySpecPhase,
) -> Result<Box<dyn Iterator<Item = Operation> + 'a + Send>> {
match apply_spec_phase {
ApplySpecPhase::CreatePrivilegedRole => Ok(Box::new(once(Operation {
query: format!(
include_str!("sql/create_privileged_role.sql"),
privileged_role_name = params.privileged_role_name
),
ApplySpecPhase::CreateNeonSuperuser => Ok(Box::new(once(Operation {
query: include_str!("sql/create_neon_superuser.sql").to_string(),
comment: None,
}))),
ApplySpecPhase::DropInvalidDatabases => {
@@ -711,9 +697,8 @@ async fn get_operations<'a>(
None => {
let query = if !jwks_roles.contains(role.name.as_str()) {
format!(
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE {} {}",
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser {}",
role.name.pg_quote(),
params.privileged_role_name,
role.to_pg_options(),
)
} else {
@@ -864,9 +849,8 @@ async fn get_operations<'a>(
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on the database
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
query: format!(
"GRANT ALL PRIVILEGES ON DATABASE {} TO {}",
db.name.pg_quote(),
params.privileged_role_name
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
db.name.pg_quote()
),
comment: None,
},

View File

@@ -0,0 +1,8 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
END IF;
END
$$;

View File

@@ -1,8 +0,0 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{privileged_role_name}')
THEN
CREATE ROLE {privileged_role_name} CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
END IF;
END
$$;

View File

@@ -8,10 +8,10 @@ code changes locally, but not suitable for running production systems.
## Example: Start with Postgres 16
To create and start a local development environment with Postgres 16, you will need to provide `--pg-version` flag to 2 of the start-up commands.
To create and start a local development environment with Postgres 16, you will need to provide `--pg-version` flag to 3 of the start-up commands.
```shell
cargo neon init
cargo neon init --pg-version 16
cargo neon start
cargo neon tenant create --set-default --pg-version 16
cargo neon endpoint create main --pg-version 16

View File

@@ -16,7 +16,7 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::{ComputeMode, PageserverProtocol};
use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverShardConnectionInfo};
use control_plane::broker::StorageBroker;
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
@@ -407,12 +407,6 @@ struct StorageControllerStartCmdArgs {
help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
)]
base_port: Option<u16>,
#[clap(
long,
help = "Whether the storage controller should handle pageserver-reported local disk loss events."
)]
handle_ps_local_disk_loss: Option<bool>,
}
#[derive(clap::Args)]
@@ -637,10 +631,6 @@ struct EndpointCreateCmdArgs {
help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
)]
allow_multiple: bool,
/// Only allow changing it on creation
#[clap(long, help = "Name of the privileged role for the endpoint")]
privileged_role_name: Option<String>,
}
#[derive(clap::Args)]
@@ -1490,7 +1480,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.grpc,
!args.update_catalog,
false,
args.privileged_role_name.clone(),
)?;
}
EndpointCmd::Start(args) => {
@@ -1517,7 +1506,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?;
.ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
if !args.allow_multiple {
cplane.check_conflicting_endpoints(
@@ -1527,29 +1516,35 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
)?;
}
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (shards, stripe_size) = if let Some(ps_id) = pageserver_id {
let conf = env.get_pageserver_conf(ps_id).unwrap();
let libpq_url = Some({
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
format!("postgres://no_user@{host}:{port}")
});
let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr {
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
Some(format!("grpc://no_user@{host}:{port}"))
} else {
None
};
let pageserver = PageserverShardConnectionInfo {
libpq_url,
grpc_url,
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
(vec![pageserver], DEFAULT_STRIPE_SIZE)
(vec![(0, pageserver)], DEFAULT_STRIPE_SIZE)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
// to pass these on to postgres.
let storage_controller = StorageController::from_env(env);
let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
let pageservers = futures::future::try_join_all(
locate_result.shards.into_iter().map(|shard| async move {
let shards = futures::future::try_join_all(locate_result.shards.into_iter().map(
|shard| async move {
if let ComputeMode::Static(lsn) = endpoint.mode {
// Initialize LSN leases for static computes.
let conf = env.get_pageserver_conf(shard.node_id).unwrap();
@@ -1561,28 +1556,34 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?;
}
let pageserver = if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?,
shard.listen_grpc_port.expect("no gRPC port"),
)
let libpq_host = Host::parse(&shard.listen_pg_addr)?;
let libpq_port = shard.listen_pg_port;
let libpq_url =
Some(format!("postgres://no_user@{libpq_host}:{libpq_port}"));
let grpc_url = if let Some(grpc_host) = shard.listen_grpc_addr {
let grpc_port = shard.listen_grpc_port.expect("no gRPC port");
Some(format!("grpc://no_user@{grpc_host}:{grpc_port}"))
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr)?,
shard.listen_pg_port,
)
None
};
anyhow::Ok(pageserver)
}),
)
let pageserver = PageserverShardConnectionInfo {
libpq_url,
grpc_url,
};
anyhow::Ok((shard.shard_id.shard_number.0 as u32, pageserver))
},
))
.await?;
let stripe_size = locate_result.shard_params.stripe_size;
(pageservers, stripe_size)
(shards, stripe_size)
};
assert!(!shards.is_empty());
let pageserver_conninfo = PageserverConnectionInfo {
shards: shards.into_iter().collect(),
prefer_grpc: endpoint.grpc,
};
assert!(!pageservers.is_empty());
let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
@@ -1612,7 +1613,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
endpoint_storage_addr,
safekeepers_generation,
safekeepers,
pageservers,
pageserver_conninfo,
remote_ext_base_url: remote_ext_base_url.clone(),
shard_stripe_size: stripe_size.0 as usize,
create_test_user: args.create_test_user,
@@ -1631,20 +1632,27 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
let shards = if let Some(ps_id) = args.endpoint_pageserver_id {
let conf = env.get_pageserver_conf(ps_id)?;
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let libpq_url = Some({
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
format!("postgres://no_user@{host}:{port}")
});
let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr {
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
Some(format!("grpc://no_user@{host}:{port}"))
} else {
None
};
vec![pageserver]
let pageserver = PageserverShardConnectionInfo {
libpq_url,
grpc_url,
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
vec![(0, pageserver)]
} else {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1654,28 +1662,36 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.into_iter()
.map(|shard| {
// Use gRPC if requested.
if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))
.expect("bad hostname"),
shard.listen_grpc_port.expect("no gRPC port"),
)
let libpq_host = Host::parse(&shard.listen_pg_addr).expect("bad hostname");
let libpq_port = shard.listen_pg_port;
let libpq_url =
Some(format!("postgres://no_user@{libpq_host}:{libpq_port}"));
let grpc_url = if let Some(grpc_host) = shard.listen_grpc_addr {
let grpc_port = shard.listen_grpc_port.expect("no gRPC port");
Some(format!("grpc://no_user@{grpc_host}:{grpc_port}"))
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
shard.listen_pg_port,
)
}
None
};
(
shard.shard_id.shard_number.0 as u32,
PageserverShardConnectionInfo {
libpq_url,
grpc_url,
},
)
})
.collect::<Vec<_>>()
};
let pageserver_conninfo = PageserverConnectionInfo {
shards: shards.into_iter().collect(),
prefer_grpc: endpoint.grpc,
};
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = parse_safekeepers(&args.safekeepers)?;
endpoint
.reconfigure(Some(pageservers), None, safekeepers, None)
.reconfigure(Some(pageserver_conninfo), None, safekeepers, None)
.await?;
}
EndpointCmd::Stop(args) => {
@@ -1815,7 +1831,6 @@ async fn handle_storage_controller(
instance_id: args.instance_id,
base_port: args.base_port,
start_timeout: args.start_timeout,
handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
};
if let Err(e) = svc.start(start_args).await {

View File

@@ -36,7 +36,7 @@ impl StorageBroker {
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
println!("Starting neon broker at {}", broker.client_url());
print!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();

View File

@@ -32,8 +32,7 @@
//! config.json - passed to `compute_ctl`
//! pgdata/
//! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
//! neon.signal
//! zenith.signal - copy of neon.signal, for backward compatibility
//! zenith.signal
//! <other PostgreSQL files>
//! ```
//!
@@ -57,14 +56,19 @@ use compute_api::responses::{
TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
PgIdent, RemoteExtSpec, Role,
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
};
// re-export these, because they're used in the reconfigure() function
pub use compute_api::spec::{PageserverConnectionInfo, PageserverShardConnectionInfo};
use jsonwebtoken::jwk::{
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse,
};
use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use pem::Pem;
use reqwest::header::CONTENT_TYPE;
use safekeeper_api::PgMajorVersion;
@@ -74,9 +78,7 @@ use sha2::{Digest, Sha256};
use spki::der::Decode;
use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::shard::ShardStripeSize;
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
@@ -99,7 +101,6 @@ pub struct EndpointConf {
features: Vec<ComputeFeature>,
cluster: Option<Cluster>,
compute_ctl_config: ComputeCtlConfig,
privileged_role_name: Option<String>,
}
//
@@ -200,7 +201,6 @@ impl ComputeControlPlane {
grpc: bool,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
privileged_role_name: Option<String>,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
@@ -238,7 +238,6 @@ impl ComputeControlPlane {
features: vec![],
cluster: None,
compute_ctl_config: compute_ctl_config.clone(),
privileged_role_name: privileged_role_name.clone(),
});
ep.create_endpoint_dir()?;
@@ -260,7 +259,6 @@ impl ComputeControlPlane {
features: vec![],
cluster: None,
compute_ctl_config,
privileged_role_name,
})?,
)?;
std::fs::write(
@@ -336,9 +334,6 @@ pub struct Endpoint {
/// The compute_ctl config for the endpoint's compute.
compute_ctl_config: ComputeCtlConfig,
/// The name of the privileged role for the endpoint.
privileged_role_name: Option<String>,
}
#[derive(PartialEq, Eq)]
@@ -387,7 +382,7 @@ pub struct EndpointStartArgs {
pub endpoint_storage_addr: String,
pub safekeepers_generation: Option<SafekeeperGeneration>,
pub safekeepers: Vec<NodeId>,
pub pageservers: Vec<(PageserverProtocol, Host, u16)>,
pub pageserver_conninfo: PageserverConnectionInfo,
pub remote_ext_base_url: Option<String>,
pub shard_stripe_size: usize,
pub create_test_user: bool,
@@ -439,7 +434,6 @@ impl Endpoint {
features: conf.features,
cluster: conf.cluster,
compute_ctl_config: conf.compute_ctl_config,
privileged_role_name: conf.privileged_role_name,
})
}
@@ -472,7 +466,7 @@ impl Endpoint {
conf.append("max_connections", "100");
conf.append("wal_level", "logical");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreceiver will send a feedback message to the wal sender.
// It also defines how often the walreciever will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");
conf.append("listen_addresses", &self.pg_address.ip().to_string());
conf.append("port", &self.pg_address.port().to_string());
@@ -662,14 +656,6 @@ impl Endpoint {
}
}
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
pageservers
.iter()
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
.collect::<Vec<_>>()
.join(",")
}
/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
@@ -715,9 +701,6 @@ impl Endpoint {
std::fs::remove_dir_all(self.pgdata())?;
}
let pageserver_connstring = Self::build_pageserver_connstr(&args.pageservers);
assert!(!pageserver_connstring.is_empty());
let safekeeper_connstrings = self.build_safekeepers_connstrs(args.safekeepers)?;
// check for file remote_extensions_spec.json
@@ -776,7 +759,7 @@ impl Endpoint {
branch_id: None,
endpoint_id: Some(self.endpoint_id.clone()),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
pageserver_connection_info: Some(args.pageserver_conninfo),
safekeepers_generation: args.safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: args.auth_token.clone(),
@@ -878,10 +861,6 @@ impl Endpoint {
cmd.arg("--dev");
}
if let Some(privileged_role_name) = self.privileged_role_name.clone() {
cmd.args(["--privileged-role-name", &privileged_role_name]);
}
let child = cmd.spawn()?;
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let child = scopeguard::guard(child, |mut child| {
@@ -935,8 +914,7 @@ impl Endpoint {
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}
@@ -994,7 +972,7 @@ impl Endpoint {
pub async fn reconfigure(
&self,
pageservers: Option<Vec<(PageserverProtocol, Host, u16)>>,
pageserver_conninfo: Option<PageserverConnectionInfo>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
safekeeper_generation: Option<SafekeeperGeneration>,
@@ -1010,15 +988,17 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
// If pageservers are not specified, don't change them.
if let Some(pageservers) = pageservers {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
if let Some(pageserver_conninfo) = pageserver_conninfo {
// If pageservers are provided, we need to ensure that they are not empty.
// This is a requirement for the compute_ctl configuration.
anyhow::ensure!(
!pageserver_conninfo.shards.is_empty(),
"no pageservers provided"
);
spec.pageserver_connection_info = Some(pageserver_conninfo);
}
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
// If safekeepers are not specified, don't change them.
@@ -1067,7 +1047,7 @@ impl Endpoint {
pub async fn reconfigure_pageservers(
&self,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
pageservers: PageserverConnectionInfo,
stripe_size: Option<ShardStripeSize>,
) -> Result<()> {
self.reconfigure(Some(pageservers), stripe_size, None, None)

View File

@@ -217,9 +217,6 @@ pub struct NeonStorageControllerConf {
pub posthog_config: Option<PostHogConfig>,
pub kick_secondary_downloads: Option<bool>,
#[serde(with = "humantime_serde")]
pub shard_split_request_timeout: Option<Duration>,
}
impl NeonStorageControllerConf {
@@ -253,7 +250,6 @@ impl Default for NeonStorageControllerConf {
timeline_safekeeper_count: None,
posthog_config: None,
kick_secondary_downloads: None,
shard_split_request_timeout: None,
}
}
}

View File

@@ -303,7 +303,7 @@ impl PageServerNode {
async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
println!(
print!(
"Starting pageserver node {} at '{}' in {:?}, retrying for {:?}",
self.conf.id,
self.pg_connection_config.raw_address(),
@@ -452,12 +452,6 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
// HADRON
image_layer_force_creation_period: settings
.remove("image_layer_force_creation_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'image_layer_force_creation_period' as duration")?,
image_layer_creation_check_threshold: settings
.remove("image_layer_creation_check_threshold")
.map(|x| x.parse::<u8>())

View File

@@ -127,7 +127,7 @@ impl SafekeeperNode {
extra_opts: &[String],
retry_timeout: &Duration,
) -> anyhow::Result<()> {
println!(
print!(
"Starting safekeeper at '{}' in '{}', retrying for {:?}",
self.pg_connection_config.raw_address(),
self.datadir_path().display(),

View File

@@ -56,7 +56,6 @@ pub struct NeonStorageControllerStartArgs {
pub instance_id: u8,
pub base_port: Option<u16>,
pub start_timeout: humantime::Duration,
pub handle_ps_local_disk_loss: Option<bool>,
}
impl NeonStorageControllerStartArgs {
@@ -65,7 +64,6 @@ impl NeonStorageControllerStartArgs {
instance_id: 1,
base_port: None,
start_timeout,
handle_ps_local_disk_loss: None,
}
}
}
@@ -650,13 +648,6 @@ impl StorageController {
args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
}
if let Some(duration) = self.config.shard_split_request_timeout {
args.push(format!(
"--shard-split-request-timeout={}",
humantime::Duration::from(duration)
));
}
let mut envs = vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
@@ -669,11 +660,7 @@ impl StorageController {
));
}
println!("Starting storage controller at {scheme}://{host}:{listen_port}");
if start_args.handle_ps_local_disk_loss.unwrap_or_default() {
args.push("--handle-ps-local-disk-loss".to_string());
}
println!("Starting storage controller");
background_process::start_process(
COMMAND,

View File

@@ -14,7 +14,6 @@ humantime.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
reqwest.workspace = true
safekeeper_api.workspace=true
serde_json = { workspace = true, features = ["raw_value"] }
storage_controller_client.workspace = true
tokio.workspace = true

View File

@@ -11,7 +11,7 @@ use pageserver_api::controller_api::{
PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
@@ -21,7 +21,6 @@ use pageserver_api::models::{
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Certificate, Method, StatusCode, Url};
use safekeeper_api::models::TimelineLocateResponse;
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -76,12 +75,6 @@ enum Command {
NodeStartDelete {
#[arg(long)]
node_id: NodeId,
/// When `force` is true, skip waiting for shards to prewarm during migration.
/// This can significantly speed up node deletion since prewarming all shards
/// can take considerable time, but may result in slower initial access to
/// migrated shards until they warm up naturally.
#[arg(long)]
force: bool,
},
/// Cancel deletion of the specified pageserver and wait for `timeout`
/// for the operation to be canceled. May be retried.
@@ -286,23 +279,6 @@ enum Command {
#[arg(long)]
concurrency: Option<usize>,
},
/// Locate safekeepers for a timeline from the storcon DB.
TimelineLocate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
},
/// Migrate a timeline to a new set of safekeepers
TimelineSafekeeperMigrate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
/// Example: --new-sk-set 1,2,3
#[arg(long, required = true, value_delimiter = ',')]
new_sk_set: Vec<NodeId>,
},
}
#[derive(Parser)]
@@ -482,7 +458,6 @@ async fn main() -> anyhow::Result<()> {
listen_http_port,
listen_https_port,
availability_zone_id: AvailabilityZone(availability_zone_id),
node_ip_addr: None,
}),
)
.await?;
@@ -958,14 +933,13 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?;
}
Command::NodeStartDelete { node_id, force } => {
let query = if force {
format!("control/v1/node/{node_id}/delete?force=true")
} else {
format!("control/v1/node/{node_id}/delete")
};
Command::NodeStartDelete { node_id } => {
storcon_client
.dispatch::<(), ()>(Method::PUT, query, None)
.dispatch::<(), ()>(
Method::PUT,
format!("control/v1/node/{node_id}/delete"),
None,
)
.await?;
println!("Delete started for {node_id}");
}
@@ -1350,7 +1324,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let mut path = format!(
"v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
);
if let Some(c) = concurrency {
@@ -1361,41 +1335,6 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::POST, path, None)
.await?;
}
Command::TimelineLocate {
tenant_id,
timeline_id,
} => {
let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate");
let resp = storcon_client
.dispatch::<(), TimelineLocateResponse>(Method::GET, path, None)
.await?;
let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_sk_set = resp
.new_sk_set
.as_ref()
.map(|ids| ids.iter().map(|id| id.0 as i64).collect::<Vec<_>>());
println!("generation = {}", resp.generation);
println!("sk_set = {sk_set:?}");
println!("new_sk_set = {new_sk_set:?}");
}
Command::TimelineSafekeeperMigrate {
tenant_id,
timeline_id,
new_sk_set,
} => {
let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate");
storcon_client
.dispatch::<_, ()>(
Method::POST,
path,
Some(TimelineSafekeeperMigrateRequest { new_sk_set }),
)
.await?;
}
}
Ok(())

View File

@@ -35,7 +35,6 @@ reason = "The paste crate is a build-only dependency with no runtime components.
# More documentation for the licenses section can be found here:
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
[licenses]
version = 2
allow = [
"0BSD",
"Apache-2.0",

View File

@@ -54,16 +54,14 @@ else
printf '%s\n' "${result}" | jq .
fi
if [[ "${RUN_PARALLEL:-false}" != "true" ]]; then
echo "Check if a timeline present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
fi
if [[ -z "${timeline_id:-}" || "${timeline_id:-}" = null ]]; then
echo "Check if a timeline present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
if [[ -z "${timeline_id}" || "${timeline_id}" = null ]]; then
generate_id timeline_id
PARAMS=(
-sbf

Some files were not shown because too many files have changed in this diff Show More