Compare commits

..

13 Commits

Author SHA1 Message Date
Conrad Ludgate
fe7254b3fd update dashmap with new unsafe raw shards api 2024-06-19 09:03:59 +01:00
Alex Chi Z
68476bb4ba feat(pageserver): add iterator API for btree reader (#8083)
The new image iterator and delta iterator uses an iterator-based API.
https://github.com/neondatabase/neon/pull/8006 / part of
https://github.com/neondatabase/neon/issues/8002

This requires the underlying thing (the btree) to have an iterator API,
and the iterator should have a type name so that it can be stored
somewhere.

```rust
pub struct DeltaLayerIterator {
  index_iterator: BTreeIterator
}
```

versus

```rust
pub struct DeltaLayerIterator {
  index_iterator: impl Stream<....>
}
```

(this requires nightly flag and still buggy in the Rust compiler)


There are multiple ways to achieve this:

1. Either write a BTreeIterator from scratch that provides `async next`.
This is the most efficient way to do that.
2. Or wrap the current `get_stream` API, which is the current approach
in the pull request.

In the future, we should do (1), and the `get_stream` API should be
refactored to use the iterator API. With (2), we have to wrap the
`get_stream` API with `Pin<Box<dyn Stream>>`, where we have the overhead
of dynamic dispatch. However, (2) needs a rewrite of the `visit`
function, which would take some time to write and review. I'd like to
define this iterator API first and work on a real iterator API later.

## Summary of changes

Add `DiskBtreeIterator` and related tests.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-06-18 20:02:57 +00:00
Arseny Sher
6bb8b1d7c2 Remove dead code from walproposer_pg.c
Now that logical walsenders fetch WAL from safekeepers recovery in walproposer
is not needed. Fixes warnings.
2024-06-18 21:12:02 +03:00
Yuchen Liang
30b890e378 feat(pageserver): use leases to temporarily block gc (#8084)
Part of #7497, extracts from #7996, closes #8063.

## Problem

With the LSN lease API introduced in
https://github.com/neondatabase/neon/issues/7808, we want to implement
the real lease logic so that GC will
keep all the layers needed to reconstruct all pages at all the leased
LSNs with valid leases at a given time.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-06-18 17:37:06 +00:00
Heikki Linnakangas
560627b525 Replace a few references to Zenith with neon 2024-06-18 20:01:32 +03:00
Heikki Linnakangas
1c1b4b0c04 Add a bunch of items for new changes that we've made 2024-06-18 20:01:32 +03:00
Heikki Linnakangas
b774ab54d4 Remove obsolete ones
- Relation size cache was moved to extension

- the changes in visibilitymap.c and freespace.c became unnecessary
  with v16, thanks to changes in upstream code

- WALProposer was moved to extension

- The hack in ReadBuffer_common to not throw an error on unexpected
  data beyond EOF was removed in v16 rebase. We haven't seen such
  errors, so I guess that was some early issue that was fixed long
  time ago.

- The ginfast.c diff was made unnecessary by upstream commit 56b662523f
2024-06-18 20:01:32 +03:00
Heikki Linnakangas
33a09946fc Prefetching has been implemented 2024-06-18 20:01:32 +03:00
Heikki Linnakangas
0396ed67f7 Update comments on various items
To update things that have changed since this was written, and to
reflect discussions at offsite meeting.
2024-06-18 20:01:32 +03:00
Heikki Linnakangas
8ee6724167 Update overview section to reflect current code organization 2024-06-18 20:01:32 +03:00
dependabot[bot]
8a9fa0a4e4 build(deps): bump urllib3 from 1.26.18 to 1.26.19 (#8086) 2024-06-18 16:40:46 +01:00
dependabot[bot]
cf60e4c0c5 build(deps): bump ws from 8.16.0 to 8.17.1 in /test_runner/pg_clients/typescript/serverless-driver (#8087) 2024-06-18 16:40:27 +01:00
Arpad Müller
68a2298973 Add support to specifying storage account in AzureConfig (#8090)
We want to be able to specify the storage account via the toml
configuration, so that we can connect to multiple storage accounts in
the same process.

https://neondb.slack.com/archives/C06SJG60FRB/p1718702144270139
2024-06-18 16:03:23 +02:00
33 changed files with 685 additions and 546 deletions

18
Cargo.lock generated
View File

@@ -1598,6 +1598,20 @@ dependencies = [
"parking_lot_core 0.9.8",
]
[[package]]
name = "dashmap"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23fadfd577acfd4485fb258011b0fd080882ea83359b6fd41304900b94ccf487"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.8",
]
[[package]]
name = "data-encoding"
version = "2.4.0"
@@ -2848,7 +2862,7 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4644821e1c3d7a560fe13d842d13f587c07348a1a05d3a797152d41c90c56df2"
dependencies = [
"dashmap",
"dashmap 5.5.0",
"hashbrown 0.13.2",
]
@@ -4296,7 +4310,7 @@ dependencies = [
"clap",
"consumption_metrics",
"crossbeam-deque",
"dashmap",
"dashmap 6.0.0",
"env_logger",
"fallible-iterator",
"framed-websockets",

View File

@@ -77,7 +77,7 @@ const_format = "0.2"
crc32c = "0.6"
crossbeam-deque = "0.8.5"
crossbeam-utils = "0.8.5"
dashmap = { version = "5.5.0", features = ["raw-api"] }
dashmap = { version = "6.0", features = ["raw-api"] }
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"

View File

@@ -862,13 +862,20 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
let allow_multiple = sub_args.get_flag("allow-multiple");
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&sub_args)? {
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};
// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
})?);
safekeepers.push(sk_id);
}
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};
let endpoint = cplane
.endpoints
@@ -972,10 +979,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
})
.collect::<Vec<_>>()
};
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = parse_safekeepers(&sub_args)?;
endpoint.reconfigure(pageservers, None, safekeepers).await?;
endpoint.reconfigure(pageservers, None).await?;
}
"stop" => {
let endpoint_id = sub_args
@@ -997,23 +1001,6 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
Ok(())
}
/// Parse --safekeepers as list of safekeeper ids.
fn parse_safekeepers(sub_args: &ArgMatches) -> Result<Option<Vec<NodeId>>> {
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(
u64::from_str(sk_id)
.map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
);
safekeepers.push(sk_id);
}
Ok(Some(safekeepers))
} else {
Ok(None)
}
}
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match sub_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
@@ -1586,7 +1573,7 @@ fn cli() -> Command {
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.arg(endpoint_pageserver_id_arg.clone())
.arg(safekeepers_arg.clone())
.arg(safekeepers_arg)
.arg(remote_ext_config_args)
.arg(create_test_user)
.arg(allow_multiple.clone())
@@ -1594,7 +1581,6 @@ fn cli() -> Command {
.subcommand(Command::new("reconfigure")
.about("Reconfigure the endpoint")
.arg(endpoint_pageserver_id_arg)
.arg(safekeepers_arg)
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
)

View File

@@ -499,23 +499,6 @@ impl Endpoint {
.join(",")
}
/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in sk_ids {
let sk = self
.env
.safekeepers
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
}
}
Ok(safekeeper_connstrings)
}
pub async fn start(
&self,
auth_token: &Option<String>,
@@ -540,7 +523,18 @@ impl Endpoint {
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstring.is_empty());
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in safekeepers {
let sk = self
.env
.safekeepers
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
}
}
// check for file remote_extensions_spec.json
// if it is present, read it and pass to compute_ctl
@@ -747,7 +741,6 @@ impl Endpoint {
&self,
mut pageservers: Vec<(Host, u16)>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
let mut spec: ComputeSpec = {
let spec_path = self.endpoint_path().join("spec.json");
@@ -782,12 +775,6 @@ impl Endpoint {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
// If safekeepers are not specified, don't change them.
if let Some(safekeepers) = safekeepers {
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
spec.safekeeper_connstrings = safekeeper_connstrings;
}
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()

View File

@@ -383,6 +383,10 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
lsn_lease_length_for_ts: settings
.remove("lsn_lease_length_for_ts")
.map(|x| x.to_string()),
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -506,6 +510,10 @@ impl PageServerNode {
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_aux_file_policy'")?,
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
lsn_lease_length_for_ts: settings
.remove("lsn_lease_length_for_ts")
.map(|x| x.to_string()),
}
};

View File

@@ -11,15 +11,28 @@ page server. We currently use the same binary for both, with --wal-redo runtime
the WAL redo mode. Some PostgreSQL changes are needed in the compute node, while others are just for
the WAL redo process.
In addition to core PostgreSQL changes, there is a Neon extension in contrib/neon, to hook into the
smgr interface. Once all the core changes have been submitted to upstream or eliminated some other
way, the extension could live outside the postgres repository and build against vanilla PostgreSQL.
In addition to core PostgreSQL changes, there is a Neon extension in the pgxn/neon directory that
hooks into the smgr interface, and rmgr extension in pgxn/neon_rmgr. The extensions are loaded into
the Postgres processes with shared_preload_libraries. Most of the Neon-specific code is in the
extensions, and for any new features, that is preferred over modifying core PostgreSQL code.
Below is a list of all the PostgreSQL source code changes, categorized into changes needed for
compute, and changes needed for the WAL redo process:
# Changes for Compute node
## Prefetching
There are changes in many places to perform prefetching, for example for sequential scans. Neon
doesn't benefit from OS readahead, and the latency to pageservers is quite high compared to local
disk, so prefetching is critical for performance, also for sequential scans.
### How to get rid of the patch
Upcoming "streaming read" work in v17 might simplify this. And async I/O work in v18 will hopefully
do more.
## Add t_cid to heap WAL records
```
@@ -37,54 +50,11 @@ The problem is that the XLOG_HEAP_INSERT record does not include the command id
Bite the bullet and submit the patch to PostgreSQL, to add the t_cid to the WAL records. It makes the WAL records larger, which could make this unpopular in the PostgreSQL community. However, it might simplify some logical decoding code; Andres Freund briefly mentioned in PGCon 2022 discussion on Heikki's Neon presentation that logical decoding currently needs to jump through some hoops to reconstruct the same information.
Update from Heikki (2024-04-17): I tried to write an upstream patch for that, to use the t_cid field for logical decoding, but it was not as straightforward as it first sounded.
### Alternatives
Perhaps we could write an extra WAL record with the t_cid information, when a page is evicted that contains rows that were touched a transaction that's still running. However, that seems very complicated.
## ginfast.c
```
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
index e0d9940946..2d964c02e9 100644
--- a/src/backend/access/gin/ginfast.c
+++ b/src/backend/access/gin/ginfast.c
@@ -285,6 +285,17 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
memset(&sublist, 0, sizeof(GinMetaPageData));
makeSublist(index, collector->tuples, collector->ntuples, &sublist);
+ if (metadata->head != InvalidBlockNumber)
+ {
+ /*
+ * ZENITH: Get buffer before XLogBeginInsert() to avoid recursive call
+ * of XLogBeginInsert(). Reading a new buffer might evict a dirty page from
+ * the buffer cache, and if that page happens to be an FSM or VM page, zenith_write()
+ * will try to WAL-log an image of the page.
+ */
+ buffer = ReadBuffer(index, metadata->tail);
+ }
+
if (needWal)
XLogBeginInsert();
@@ -316,7 +327,6 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
data.prevTail = metadata->tail;
data.newRightlink = sublist.head;
- buffer = ReadBuffer(index, metadata->tail);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
```
The problem is explained in the comment above
### How to get rid of the patch
Can we stop WAL-logging FSM or VM pages? Or delay the WAL logging until we're out of the critical
section or something.
Maybe some bigger rewrite of FSM and VM would help to avoid WAL-logging FSM and VM page images?
## Mark index builds that use buffer manager without logging explicitly
```
@@ -95,6 +65,8 @@ Maybe some bigger rewrite of FSM and VM would help to avoid WAL-logging FSM and
also some changes in src/backend/storage/smgr/smgr.c
```
pgvector 0.6.0 also needs a similar change, which would be very nice to get rid of too.
When a GIN index is built, for example, it is built by inserting the entries into the index more or
less normally, but without WAL-logging anything. After the index has been built, we iterate through
all pages and write them to the WAL. That doesn't work for Neon, because if a page is not WAL-logged
@@ -109,6 +81,10 @@ an operation: `smgr_start_unlogged_build`, `smgr_finish_unlogged_build_phase_1`
I think it would make sense to be more explicit about that in PostgreSQL too. So extract these
changes to a patch and post to pgsql-hackers.
Perhaps we could deduce that an unlogged index build has started when we see a page being evicted
with zero LSN. How to be sure it's an unlogged index build rather than a bug? Currently we have a
check for that and PANIC if we see page with zero LSN being evicted. And how do we detect when the
index build has finished? See https://github.com/neondatabase/neon/pull/7440 for an attempt at that.
## Track last-written page LSN
@@ -140,57 +116,6 @@ The old method is still available, though.
Wait until v15?
## Cache relation sizes
The Neon extension contains a little cache for smgrnblocks() and smgrexists() calls, to avoid going
to the page server every time. It might be useful to cache those in PostgreSQL, maybe in the
relcache? (I think we do cache nblocks in relcache already, check why that's not good enough for
Neon)
## Use buffer manager when extending VM or FSM
```
src/backend/storage/freespace/freespace.c | 14 +-
src/backend/access/heap/visibilitymap.c | 15 +-
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index e198df65d8..addfe93eac 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -652,10 +652,19 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
/* Now extend the file */
while (vm_nblocks_now < vm_nblocks)
{
- PageSetChecksumInplace((Page) pg.data, vm_nblocks_now);
+ /*
+ * ZENITH: Initialize VM pages through buffer cache to prevent loading
+ * them from pageserver.
+ */
+ Buffer buffer = ReadBufferExtended(rel, VISIBILITYMAP_FORKNUM, P_NEW,
+ RBM_ZERO_AND_LOCK, NULL);
+ Page page = BufferGetPage(buffer);
+
+ PageInit((Page) page, BLCKSZ, 0);
+ PageSetChecksumInplace(page, vm_nblocks_now);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
- smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
- pg.data, false);
vm_nblocks_now++;
}
```
### Problem we're trying to solve
???
### How to get rid of the patch
Maybe this would be a reasonable change in PostgreSQL too?
## Allow startup without reading checkpoint record
In Neon, the compute node is stateless. So when we are launching compute node, we need to provide
@@ -231,7 +156,7 @@ index 0415df9ccb..9f9db3c8bc 100644
* crash we can lose (skip over) as many values as we pre-logged.
*/
-#define SEQ_LOG_VALS 32
+/* Zenith XXX: to ensure sequence order of sequence in Zenith we need to WAL log each sequence update. */
+/* Neon XXX: to ensure sequence order of sequence in Zenith we need to WAL log each sequence update. */
+/* #define SEQ_LOG_VALS 32 */
+#define SEQ_LOG_VALS 0
```
@@ -250,66 +175,6 @@ would be weird if the sequence moved backwards though, think of PITR.
Or add a GUC for the amount to prefix to PostgreSQL, and force it to 1 in Neon.
## Walproposer
```
src/Makefile | 1 +
src/backend/replication/libpqwalproposer/Makefile | 37 +
src/backend/replication/libpqwalproposer/libpqwalproposer.c | 416 ++++++++++++
src/backend/postmaster/bgworker.c | 4 +
src/backend/postmaster/postmaster.c | 6 +
src/backend/replication/Makefile | 4 +-
src/backend/replication/walproposer.c | 2350 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/backend/replication/walproposer_utils.c | 402 +++++++++++
src/backend/replication/walreceiver.c | 7 +
src/backend/replication/walsender.c | 320 ++++++---
src/backend/storage/ipc/ipci.c | 6 +
src/include/replication/walproposer.h | 565 ++++++++++++++++
```
WAL proposer is communicating with safekeeper and ensures WAL durability by quorum writes. It is
currently implemented as patch to standard WAL sender.
### How to get rid of the patch
Refactor into an extension. Submit hooks or APIs into upstream if necessary.
@MMeent did some work on this already: https://github.com/neondatabase/postgres/pull/96
## Ignore unexpected data beyond EOF in bufmgr.c
```
@@ -922,11 +928,14 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((Page) bufBlock))
- ereport(ERROR,
+ {
+ // XXX-ZENITH
+ MemSet((char *) bufBlock, 0, BLCKSZ);
+ ereport(DEBUG1,
(errmsg("unexpected data beyond EOF in block %u of relation %s",
blockNum, relpath(smgr->smgr_rnode, forkNum)),
errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
-
+ }
/*
* We *must* do smgrextend before succeeding, else the page will not
* be reserved by the kernel, and the next P_NEW call will decide to
```
PostgreSQL is a bit sloppy with extending relations. Usually, the relation is extended with zeros
first, then the page is filled, and finally the new page WAL-logged. But if multiple backends extend
a relation at the same time, the pages can be WAL-logged in different order.
I'm not sure what scenario exactly required this change in Neon, though.
### How to get rid of the patch
Submit patches to pgsql-hackers, to tighten up the WAL-logging around relation extension. It's a bit
confusing even in PostgreSQL. Maybe WAL log the intention to extend first, then extend the relation,
and finally WAL-log that the extension succeeded.
## Make smgr interface available to extensions
```
@@ -321,6 +186,8 @@ and finally WAL-log that the extension succeeded.
Submit to upstream. This could be useful for the Disk Encryption patches too, or for compression.
We have submitted this to upstream, but it's moving at glacial a speed.
https://commitfest.postgresql.org/47/4428/
## Added relpersistence argument to smgropen()
@@ -444,6 +311,148 @@ Ignore it. This is only needed for disaster recovery, so once we've eliminated a
patches, we can just keep it around as a patch or as separate branch in a repo.
## pg_waldump flags to ignore errors
After creating a new project or branch in Neon, the first timeline can begin in the middle of a WAL segment. pg_waldump chokes on that, so we added some flags to make it possible to ignore errors.
### How to get rid of the patch
Like previous one, ignore it.
## Backpressure if pageserver doesn't ingest WAL fast enough
```
@@ -3200,6 +3202,7 @@ ProcessInterrupts(void)
return;
InterruptPending = false;
+retry:
if (ProcDiePending)
{
ProcDiePending = false;
@@ -3447,6 +3450,13 @@ ProcessInterrupts(void)
if (ParallelApplyMessagePending)
HandleParallelApplyMessages();
+
+ /* Call registered callback if any */
+ if (ProcessInterruptsCallback)
+ {
+ if (ProcessInterruptsCallback())
+ goto retry;
+ }
}
```
### How to get rid of the patch
Submit a patch to upstream, for a hook in ProcessInterrupts. Could be useful for other extensions
too.
## SLRU on-demand download
```
src/backend/access/transam/slru.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 92 insertions(+), 13 deletions(-)
```
### Problem we're trying to solve
Previously, SLRU files were included in the basebackup, but the total size of them can be large,
several GB, and downloading them all made the startup time too long.
### Alternatives
FUSE hook or LD_PRELOAD trick to intercept the reads on SLRU files
## WAL-log an all-zeros page as one large hole
- In XLogRecordAssemble()
### Problem we're trying to solve
This change was made in v16. Starting with v16, when PostgreSQL extends a relation, it first extends
it with zeros, and it can extend the relation more than one block at a time. The all-zeros page is WAL-ogged, but it's very wasteful to include 8 kB of zeros in the WAL for that. This hack was made so that we WAL logged a compact record with a whole-page "hole". However, PostgreSQL has assertions that prevent that such WAL records from being replayed, so this breaks compatibility such that unmodified PostreSQL cannot process Neon-generated WAL.
### How to get rid of the patch
Find another compact representation for a full-page image of an all-zeros page. A compressed image perhaps.
## Shut down walproposer after checkpointer
```
+ /* Neon: Also allow walproposer background worker to be treated like a WAL sender, so that it's shut down last */
+ if ((bp->bkend_type == BACKEND_TYPE_NORMAL || bp->bkend_type == BACKEND_TYPE_BGWORKER) &&
```
This changes was needed so that postmaster shuts down the walproposer process only after the shutdown checkpoint record is written. Otherwise, the shutdown record will never make it to the safekeepers.
### How to get rid of the patch
Do a bigger refactoring of the postmaster state machine, such that a background worker can specify
the shutdown ordering by itself. The postmaster state machine has grown pretty complicated, and
would benefit from a refactoring for the sake of readability anyway.
## EXPLAIN changes for prefetch and LFC
### How to get rid of the patch
Konstantin submitted a patch to -hackers already: https://commitfest.postgresql.org/47/4643/. Get that into a committable state.
## On-demand download of extensions
### How to get rid of the patch
FUSE or LD_PRELOAD trickery to intercept reads?
## Publication superuser checks
We have hacked CreatePublication so that also neon_superuser can create them.
### How to get rid of the patch
Create an upstream patch with more fine-grained privileges for publications CREATE/DROP that can be GRANTed to users.
## WAL log replication slots
### How to get rid of the patch
Utilize the upcoming v17 "slot sync worker", or a similar neon-specific background worker process, to periodically WAL-log the slots, or to export them somewhere else.
## WAL-log replication snapshots
### How to get rid of the patch
WAL-log them periodically, from a backgound worker.
## WAL-log relmapper files
Similarly to replications snapshot files, the CID mapping files generated during VACUUM FULL of a catalog table are WAL-logged
### How to get rid of the patch
WAL-log them periodically, from a backgound worker.
## XLogWaitForReplayOf()
??
# Not currently committed but proposed
## Disable ring buffer buffer manager strategies
@@ -472,23 +481,10 @@ hint bits are set. Wal logging hint bits updates requires FPI which significantl
Add special WAL record for setting page hints.
## Prefetching
### Why?
As far as pages in Neon are loaded on demand, to reduce node startup time
and also speedup some massive queries we need some mechanism for bulk loading to
reduce page request round-trip overhead.
Currently Postgres is supporting prefetching only for bitmap scan.
In Neon we should also use prefetch for sequential and index scans, because the OS is not doing it for us.
For sequential scan we could prefetch some number of following pages. For index scan we could prefetch pages
of heap relation addressed by TIDs.
## Prewarming
### Why?
Short downtime (or, in other words, fast compute node restart time) is one of the key feature of Zenith.
Short downtime (or, in other words, fast compute node restart time) is one of the key feature of Neon.
But overhead of request-response round-trip for loading pages on demand can make started node warm-up quite slow.
We can capture state of compute node buffer cache and send bulk request for this pages at startup.

View File

@@ -101,11 +101,12 @@ or
```toml
[remote_storage]
container_name = 'some-container-name'
storage_account = 'somestorageaccnt'
container_region = 'us-east'
prefix_in_container = '/test-prefix/'
```
`AZURE_STORAGE_ACCOUNT` and `AZURE_STORAGE_ACCESS_KEY` env variables can be used to specify the azure credentials if needed.
The `AZURE_STORAGE_ACCESS_KEY` env variable can be used to specify the azure credentials if needed.
## Repository background tasks

View File

@@ -177,6 +177,20 @@ serde_with::serde_conv!(
|value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
);
impl LsnLease {
/// The default length for an explicit LSN lease request (10 minutes).
pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);
/// The default length for an implicit LSN lease granted during
/// `get_lsn_by_timestamp` request (1 minutes).
pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);
/// Checks whether the lease is expired.
pub fn is_expired(&self, now: &SystemTime) -> bool {
now > &self.valid_until
}
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
@@ -322,6 +336,8 @@ pub struct TenantConfig {
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub switch_aux_file_policy: Option<AuxFilePolicy>,
pub lsn_lease_length: Option<String>,
pub lsn_lease_length_for_ts: Option<String>,
}
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`

View File

@@ -54,7 +54,10 @@ impl AzureBlobStorage {
azure_config.container_name
);
let account = env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT");
// Use the storage account from the config by default, fall back to env var if not present.
let account = azure_config.storage_account.clone().unwrap_or_else(|| {
env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT")
});
// If the `AZURE_STORAGE_ACCESS_KEY` env var has an access key, use that,
// otherwise try the token based credentials.

View File

@@ -466,7 +466,11 @@ impl GenericRemoteStorage {
Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout)?))
}
RemoteStorageKind::AzureContainer(azure_config) => {
info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'",
let storage_account = azure_config
.storage_account
.as_deref()
.unwrap_or("<AZURE_STORAGE_ACCOUNT>");
info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
}
@@ -589,6 +593,8 @@ impl Debug for S3Config {
pub struct AzureConfig {
/// Name of the container to connect to.
pub container_name: String,
/// Name of the storage account the container is inside of
pub storage_account: Option<String>,
/// The region where the bucket is located at.
pub container_region: String,
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
@@ -603,8 +609,9 @@ impl Debug for AzureConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureConfig")
.field("bucket_name", &self.container_name)
.field("storage_account", &self.storage_account)
.field("bucket_region", &self.container_region)
.field("prefix_in_bucket", &self.prefix_in_container)
.field("prefix_in_container", &self.prefix_in_container)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
@@ -718,6 +725,12 @@ impl RemoteStorageConfig {
(None, None, None, Some(container_name), Some(container_region)) => {
RemoteStorageKind::AzureContainer(AzureConfig {
container_name: parse_toml_string("container_name", container_name)?,
storage_account: toml
.get("storage_account")
.map(|storage_account| {
parse_toml_string("storage_account", storage_account)
})
.transpose()?,
container_region: parse_toml_string("container_region", container_region)?,
prefix_in_container: toml
.get("prefix_in_container")

View File

@@ -212,6 +212,7 @@ fn create_azure_client(
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: remote_storage_azure_container,
storage_account: None,
container_region: remote_storage_azure_region,
prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
concurrency_limit: NonZeroUsize::new(100).unwrap(),

View File

@@ -1730,7 +1730,7 @@ async fn lsn_lease_handler(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = timeline
.make_lsn_lease(lsn, &ctx)
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
json_response(StatusCode::OK, result)

View File

@@ -935,7 +935,7 @@ impl PageServerHandler {
let timeline = self
.get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
.await?;
let lease = timeline.make_lsn_lease(lsn, ctx)?;
let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
let valid_until = lease
.valid_until
.duration_since(SystemTime::UNIX_EPOCH)

View File

@@ -240,6 +240,7 @@ pub struct GcResult {
pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64,
pub layers_needed_by_leases: u64,
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
@@ -269,6 +270,7 @@ impl AddAssign for GcResult {
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_needed_by_leases += other.layers_needed_by_leases;
self.layers_not_updated += other.layers_not_updated;
self.layers_removed += other.layers_removed;

View File

@@ -31,6 +31,7 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::fmt;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -65,9 +66,9 @@ use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::GcCutoffs;
use self::timeline::TimelineResources;
use self::timeline::WaitLsnError;
use self::timeline::{GcCutoffs, GcInfo};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
@@ -2428,6 +2429,13 @@ impl Tenant {
}
}
pub fn get_lsn_lease_length(&self) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.lsn_lease_length
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
// Use read-copy-update in order to avoid overwriting the location config
// state if this races with [`Tenant::set_new_location_config`]. Note that
@@ -3010,12 +3018,13 @@ impl Tenant {
{
let mut target = timeline.gc_info.write().unwrap();
let now = SystemTime::now();
target.leases.retain(|_, lease| !lease.is_expired(&now));
match gc_cutoffs.remove(&timeline.timeline_id) {
Some(cutoffs) => {
*target = GcInfo {
retain_lsns: branchpoints,
cutoffs,
};
target.retain_lsns = branchpoints;
target.cutoffs = cutoffs;
}
None => {
// reasons for this being unavailable:
@@ -3833,6 +3842,8 @@ pub(crate) mod harness {
tenant_conf.image_layer_creation_check_threshold,
),
switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy),
lsn_lease_length: Some(tenant_conf.lsn_lease_length),
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
}
}
}
@@ -6939,4 +6950,93 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")?.load().await;
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
let image_layers = (0x20..=0x90)
.step_by(0x10)
.map(|n| {
(
Lsn(n),
vec![(key, test_img(&format!("data key at {:x}", n)))],
)
})
.collect();
let timeline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
Vec::new(),
image_layers,
end_lsn,
)
.await?;
let leased_lsns = [0x30, 0x50, 0x70];
let mut leases = Vec::new();
let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| {
leases.push(timeline.make_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)?);
Ok(())
});
// Renewing with shorter lease should not change the lease.
let updated_lease_0 =
timeline.make_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)?;
assert_eq!(updated_lease_0.valid_until, leases[0].valid_until);
// Renewing with a long lease should renew lease with later expiration time.
let updated_lease_1 = timeline.make_lsn_lease(
Lsn(leased_lsns[1]),
timeline.get_lsn_lease_length() * 2,
&ctx,
)?;
assert!(updated_lease_1.valid_until > leases[1].valid_until);
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
info!(
"latest_gc_cutoff_lsn: {}",
*timeline.get_latest_gc_cutoff_lsn()
);
timeline.force_set_disk_consistent_lsn(end_lsn);
let res = tenant
.gc_iteration(
Some(TIMELINE_ID),
0,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
// Keeping everything <= Lsn(0x80) b/c leases:
// 0/10: initdb layer
// (0/20..=0/70).step_by(0x10): image layers added when creating the timeline.
assert_eq!(res.layers_needed_by_leases, 7);
// Keeping 0/90 b/c it is the latest layer.
assert_eq!(res.layers_not_updated, 1);
// Removed 0/80.
assert_eq!(res.layers_removed, 1);
// Make lease on a already GC-ed LSN.
// 0/80 does not have a valid lease + is below latest_gc_cutoff
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
let res = timeline.make_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx);
assert!(res.is_err());
// Should still be able to renew a currently valid lease
// Assumption: original lease to is still valid for 0/50.
let _ =
timeline.make_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)?;
Ok(())
}
}

View File

@@ -13,6 +13,7 @@ use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::CompactionAlgorithm;
use pageserver_api::models::CompactionAlgorithmSettings;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::LsnLease;
use pageserver_api::models::{self, ThrottleConfig};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer;
@@ -377,6 +378,16 @@ pub struct TenantConf {
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
/// file is written.
pub switch_aux_file_policy: AuxFilePolicy,
/// The length for an explicit LSN lease request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length: Duration,
/// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request.
/// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval.
#[serde(with = "humantime_serde")]
pub lsn_lease_length_for_ts: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -476,6 +487,16 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub switch_aux_file_policy: Option<AuxFilePolicy>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub lsn_lease_length: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub lsn_lease_length_for_ts: Option<Duration>,
}
impl TenantConfOpt {
@@ -538,6 +559,12 @@ impl TenantConfOpt {
switch_aux_file_policy: self
.switch_aux_file_policy
.unwrap_or(global_conf.switch_aux_file_policy),
lsn_lease_length: self
.lsn_lease_length
.unwrap_or(global_conf.lsn_lease_length),
lsn_lease_length_for_ts: self
.lsn_lease_length_for_ts
.unwrap_or(global_conf.lsn_lease_length_for_ts),
}
}
}
@@ -582,6 +609,8 @@ impl Default for TenantConf {
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
}
}
}
@@ -657,6 +686,8 @@ impl From<TenantConfOpt> for models::TenantConfig {
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
switch_aux_file_policy: value.switch_aux_file_policy,
lsn_lease_length: value.lsn_lease_length.map(humantime),
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
}
}
}

View File

@@ -22,7 +22,7 @@ use async_stream::try_stream;
use byteorder::{ReadBytesExt, BE};
use bytes::{BufMut, Bytes, BytesMut};
use either::Either;
use futures::Stream;
use futures::{Stream, StreamExt};
use hex;
use std::{
cmp::Ordering,
@@ -259,6 +259,16 @@ where
Ok(result)
}
pub fn iter<'a>(
&'a self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> DiskBtreeIterator<'a> {
DiskBtreeIterator {
stream: Box::pin(self.get_stream_from(start_key, ctx)),
}
}
/// Return a stream which yields all key, value pairs from the index
/// starting from the first key greater or equal to `start_key`.
///
@@ -496,6 +506,19 @@ where
}
}
pub struct DiskBtreeIterator<'a> {
#[allow(clippy::type_complexity)]
stream: std::pin::Pin<
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a>,
>,
}
impl<'a> DiskBtreeIterator<'a> {
pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
self.stream.next().await
}
}
///
/// Public builder object, for creating a new tree.
///
@@ -1088,6 +1111,17 @@ pub(crate) mod tests {
== all_data.get(&u128::MAX).cloned()
);
// Test iterator and get_stream API
let mut iter = reader.iter(&[0; 16], &ctx);
let mut cnt = 0;
while let Some(res) = iter.next().await {
let (key, val) = res?;
let key = u128::from_be_bytes(key.as_slice().try_into().unwrap());
assert_eq!(val, *all_data.get(&key).unwrap());
cnt += 1;
}
assert_eq!(cnt, all_data.len());
Ok(())
}

View File

@@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
tokio::select! {
@@ -362,6 +363,14 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
if first {
first = false;
if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel)
.await
.is_err()
{
break;
}
if random_init_delay(period, &cancel).await.is_err() {
break;
}
@@ -531,6 +540,21 @@ pub(crate) async fn random_init_delay(
}
}
/// Delays GC by defaul lease length at restart.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
/// length, we gurantees that all the leases we granted before the restart will expire
/// when we run GC for the first time after the restart.
pub(crate) async fn delay_by_lease_length(
length: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
match tokio::time::timeout(length, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
pub(crate) fn warn_when_period_overrun(
elapsed: Duration,

View File

@@ -47,7 +47,6 @@ use utils::{
vec_map::VecMap,
};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -61,6 +60,10 @@ use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
};
use crate::metrics::GetKind;
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
@@ -454,6 +457,9 @@ pub(crate) struct GcInfo {
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
/// Leases granted to particular LSNs.
pub(crate) leases: BTreeMap<Lsn, LsnLease>,
}
impl GcInfo {
@@ -1555,17 +1561,46 @@ impl Timeline {
Ok(())
}
/// Obtains a temporary lease blocking garbage collection for the given LSN
/// Obtains a temporary lease blocking garbage collection for the given LSN.
///
/// This function will error if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is also
/// no existing lease to renew. If there is an existing lease in the map, the lease will be renewed only if
/// the request extends the lease. The returned lease is therefore the maximum between the existing lease and
/// the requesting lease.
pub(crate) fn make_lsn_lease(
&self,
_lsn: Lsn,
lsn: Lsn,
length: Duration,
_ctx: &RequestContext,
) -> anyhow::Result<LsnLease> {
const LEASE_LENGTH: Duration = Duration::from_secs(5 * 60);
let lease = LsnLease {
valid_until: SystemTime::now() + LEASE_LENGTH,
let lease = {
let mut gc_info = self.gc_info.write().unwrap();
let valid_until = SystemTime::now() + length;
let entry = gc_info.leases.entry(lsn);
let lease = {
if let Entry::Occupied(mut occupied) = entry {
let existing_lease = occupied.get_mut();
if valid_until > existing_lease.valid_until {
existing_lease.valid_until = valid_until;
}
existing_lease.clone()
} else {
// Reject already GC-ed LSN (lsn < latest_gc_cutoff)
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
}
entry.or_insert(LsnLease { valid_until }).clone()
}
};
lease
};
// TODO: dummy implementation
Ok(lease)
}
@@ -2082,6 +2117,24 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
pub(crate) fn get_lsn_lease_length(&self) -> Duration {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.lsn_lease_length
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
// TODO(yuchen): remove unused flag after implementing https://github.com/neondatabase/neon/issues/8072
#[allow(unused)]
pub(crate) fn get_lsn_lease_length_for_ts(&self) -> Duration {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.lsn_lease_length_for_ts
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts)
}
pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -4907,13 +4960,25 @@ impl Timeline {
return Err(GcError::TimelineCancelled);
}
let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
let (horizon_cutoff, pitr_cutoff, retain_lsns, max_lsn_with_valid_lease) = {
let gc_info = self.gc_info.read().unwrap();
let horizon_cutoff = min(gc_info.cutoffs.horizon, self.get_disk_consistent_lsn());
let pitr_cutoff = gc_info.cutoffs.pitr;
let retain_lsns = gc_info.retain_lsns.clone();
(horizon_cutoff, pitr_cutoff, retain_lsns)
// Gets the maximum LSN that holds the valid lease.
//
// Caveat: `refresh_gc_info` is in charged of updating the lease map.
// Here, we do not check for stale leases again.
let max_lsn_with_valid_lease = gc_info.leases.last_key_value().map(|(lsn, _)| *lsn);
(
horizon_cutoff,
pitr_cutoff,
retain_lsns,
max_lsn_with_valid_lease,
)
};
let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
@@ -4944,7 +5009,13 @@ impl Timeline {
.set(Lsn::INVALID.0 as i64);
let res = self
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
.gc_timeline(
horizon_cutoff,
pitr_cutoff,
retain_lsns,
max_lsn_with_valid_lease,
new_gc_cutoff,
)
.instrument(
info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
)
@@ -4961,6 +5032,7 @@ impl Timeline {
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
max_lsn_with_valid_lease: Option<Lsn>,
new_gc_cutoff: Lsn,
) -> Result<GcResult, GcError> {
// FIXME: if there is an ongoing detach_from_ancestor, we should just skip gc
@@ -5009,7 +5081,8 @@ impl Timeline {
// 1. it is older than cutoff LSN;
// 2. it is older than PITR interval;
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
// 4. it does not need to be kept for LSNs holding valid leases.
// 5. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self.layers.write().await;
@@ -5060,7 +5133,21 @@ impl Timeline {
}
}
// 4. Is there a later on-disk layer for this relation?
// 4. Is there a valid lease that requires us to keep this layer?
if let Some(lsn) = &max_lsn_with_valid_lease {
// keep if layer start <= any of the lease
if &l.get_lsn_range().start <= lsn {
debug!(
"keeping {} because there is a valid lease preventing GC at {}",
l.layer_name(),
lsn,
);
result.layers_needed_by_leases += 1;
continue 'outer;
}
}
// 5. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -5438,6 +5525,11 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
#[cfg(test)]
pub(super) fn force_set_disk_consistent_lsn(&self, new_value: Lsn) {
self.disk_consistent_lsn.store(new_value);
}
/// Force create an image layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]

View File

@@ -286,6 +286,7 @@ WalProposerPoll(WalProposer *wp)
void
WalProposerStart(WalProposer *wp)
{
/* Initiate connections to all safekeeper nodes */
for (int i = 0; i < wp->n_safekeepers; i++)
{

View File

@@ -63,8 +63,6 @@ char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
static WalproposerShmemState *walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
@@ -78,7 +76,6 @@ static HotStandbyFeedback agg_hs_feedback;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static bool backpressure_throttling_impl(void);
@@ -103,24 +100,18 @@ static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
static void WalSndLoop(WalProposer *wp);
static void XLogBroadcastWalProposer(WalProposer *wp);
static void XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void CheckGracefulShutdown(WalProposer *wp);
static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp);
static void
init_walprop_config(bool syncSafekeepers)
{
walprop_config.neon_tenant = neon_tenant;
walprop_config.neon_timeline = neon_timeline;
/* WalProposerCreate scribbles directly on it, so pstrdup */
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
walprop_config.safekeepers_list = wal_acceptors_list;
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
walprop_config.wal_segment_size = wal_segment_size;
@@ -160,7 +151,6 @@ WalProposerMain(Datum main_arg)
init_walprop_config(false);
walprop_pg_init_bgworker();
am_walproposer = true;
walprop_pg_load_libpqwalreceiver();
wp = WalProposerCreate(&walprop_config, walprop_pg);
@@ -199,10 +189,10 @@ nwp_register_gucs(void)
NULL, /* long_desc */
&wal_acceptors_list, /* valueAddr */
"", /* bootValue */
PGC_SIGHUP,
PGC_POSTMASTER,
GUC_LIST_INPUT, /* extensions can't use*
* GUC_LIST_QUOTE */
NULL, assign_neon_safekeepers, NULL);
NULL, NULL, NULL);
DefineCustomIntVariable(
"neon.safekeeper_reconnect_timeout",
@@ -225,33 +215,6 @@ nwp_register_gucs(void)
NULL, NULL, NULL);
}
/*
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
* the list changed.
*/
static void
assign_neon_safekeepers(const char *newval, void *extra)
{
if (!am_walproposer)
return;
if (!newval) {
/* should never happen */
wpg_log(FATAL, "neon.safekeepers is empty");
}
/*
* TODO: restarting through FATAL is stupid and introduces 1s delay before
* next bgw start. We should refactor walproposer to allow graceful exit and
* thus remove this delay.
*/
if (strcmp(wal_acceptors_list, newval) != 0)
{
wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s",
wal_acceptors_list, newval);
}
}
/* Check if we need to suspend inserts because of lagging replication. */
static uint64
backpressure_lag_impl(void)
@@ -400,7 +363,7 @@ walprop_register_bgworker(void)
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer");
snprintf(bgw.bgw_type, BGW_MAXLEN, "WAL proposer");
bgw.bgw_restart_time = 1;
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
@@ -1268,10 +1231,13 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
static void
WalSndLoop(WalProposer *wp)
{
XLogRecPtr flushPtr;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
for (;;)
{
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer(wp);
WalProposerPoll(wp);
}
@@ -1360,8 +1326,9 @@ XLogBroadcastWalProposer(WalProposer *wp)
}
/*
Used to download WAL before basebackup for logical walsenders from sk, no longer
needed because walsender always uses neon_walreader.
Used to download WAL before basebackup for walproposer/logical walsenders. No
longer used, replaced by neon_walreader; but callback still exists because
simulation tests use it.
*/
static bool
WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
@@ -1369,136 +1336,6 @@ WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
return true;
}
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
* corresponding the filename of walpropFile.
*/
static int walpropFile = -1;
static TimeLineID walpropFileTLI = 0;
static XLogSegNo walpropSegNo = 0;
/*
* Write XLOG data to disk.
*/
static void
XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
/*
* Apart from walproposer, basebackup LSN page is also written out by
* postgres itself which writes WAL only in pages, and in basebackup it is
* inherently dummy (only safekeepers have historic WAL). Update WAL
* buffers here to avoid dummy page overwriting correct one we download
* here. Ugly, but alternatives are about the same ugly. We won't need
* that if we switch to on-demand WAL download from safekeepers, without
* writing to disk.
*
* https://github.com/neondatabase/neon/issues/5749
*/
if (!wp->config->syncSafekeepers)
XLogUpdateWalBuffers(buf, recptr, nbytes);
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
XLogWalPropClose(recptr);
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
}
/* Calculate the start offset of the received logs */
startoff = XLogSegmentOffset(recptr, wal_segment_size);
if (startoff + nbytes > wal_segment_size)
segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
/* OK to write the logs */
errno = 0;
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
save_errno = errno;
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
xlogfname, startoff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
}
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop.
*/
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
{
XLogWalPropClose(recptr);
}
}
/*
* Close the current segment.
*/
static void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
}
walpropFile = -1;
}
static void
walprop_pg_wal_reader_allocate(Safekeeper *sk)
{
@@ -1802,20 +1639,6 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
late_cv_trigger = ConditionVariableCancelSleep();
#endif
CHECK_FOR_INTERRUPTS();
/*
* Process config if requested. This restarts walproposer if safekeepers
* list changed. Don't do that for sync-safekeepers because quite probably
* it (re-reading config) won't work without some effort, and
* sync-safekeepers should be quick to finish anyway.
*/
if (!wp->config->syncSafekeepers && ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* If wait is terminated by latch set (walsenders' latch is set on each
* wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
@@ -2028,58 +1851,6 @@ walprop_pg_log_internal(WalProposer *wp, int level, const char *line)
elog(FATAL, "unexpected log_internal message at level %d: %s", level, line);
}
static XLogRecPtr
GetLogRepRestartLSN(WalProposer *wp)
{
FILE *f;
XLogRecPtr lrRestartLsn = InvalidXLogRecPtr;
/* We don't need to do anything in syncSafekeepers mode. */
if (wp->config->syncSafekeepers)
return InvalidXLogRecPtr;
/*
* If there are active logical replication subscription we need to provide
* enough WAL for their WAL senders based on th position of their
* replication slots.
*/
f = fopen("restart.lsn", "rb");
if (f != NULL)
{
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
fclose(f);
if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
{
uint64 download_range_mb;
wpg_log(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
/*
* If we need to download more than a max_slot_wal_keep_size,
* don't do it to avoid risk of exploding pg_wal. Logical
* replication won't work until recreated, but at least compute
* would start; this also follows max_slot_wal_keep_size
* semantics.
*/
download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / MB;
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
wpg_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
return InvalidXLogRecPtr;
}
/*
* start from the beginning of the segment to fetch page headers
* verifed by XLogReader
*/
lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size);
}
}
return lrRestartLsn;
}
void
SetNeonCurrentClusterSize(uint64 size)
{

8
poetry.lock generated
View File

@@ -2806,13 +2806,13 @@ files = [
[[package]]
name = "urllib3"
version = "1.26.18"
version = "1.26.19"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
files = [
{file = "urllib3-1.26.18-py2.py3-none-any.whl", hash = "sha256:34b97092d7e0a3a8cf7cd10e386f401b3737364026c45e622aa02903dffe0f07"},
{file = "urllib3-1.26.18.tar.gz", hash = "sha256:f8ecc1bba5667413457c529ab955bf8c67b45db799d159066261719e328580a0"},
{file = "urllib3-1.26.19-py2.py3-none-any.whl", hash = "sha256:37a0344459b199fce0e80b0d3569837ec6b6937435c5244e7fd73fa6006830f3"},
{file = "urllib3-1.26.19.tar.gz", hash = "sha256:3e3d753a8618b86d7de333b4223005f68720bcd6a7d2bcb9fbd2229ec7c1e429"},
]
[package.extras]

View File

@@ -305,7 +305,7 @@ impl ProjectInfoCacheImpl {
// acquire a random shard lock
let mut removed = 0;
let shard = self.project2ep.shards()[shard].write();
for (_, endpoints) in shard.iter() {
for (_, endpoints) in crate::rawtable::iter(&*shard) {
for endpoint in endpoints.get().iter() {
self.cache.remove(endpoint);
removed += 1;

View File

@@ -517,11 +517,18 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
);
let mut lock = shard.write();
let timer = self.metrics.reclamation_lag_seconds.start_timer();
let count = lock
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
.count();
let mut removed = 0;
crate::rawtable::retain(&mut *lock, |_, semaphore| {
let remove = Arc::strong_count(semaphore.get_mut()) == 1;
if remove {
removed += 1;
}
!remove
});
drop(lock);
self.metrics.semaphores_unregistered.inc_by(count as u64);
self.metrics.semaphores_unregistered.inc_by(removed as u64);
timer.observe();
}
}

View File

@@ -25,6 +25,7 @@ pub mod parse;
pub mod protocol2;
pub mod proxy;
pub mod rate_limiter;
mod rawtable;
pub mod redis;
pub mod sasl;
pub mod scram;

61
proxy/src/rawtable.rs Normal file
View File

@@ -0,0 +1,61 @@
//! Dashmap moved to using RawTable for the shards.
//! Some of the APIs we used before are unsafe to access, but we can copy the implementations from the safe
//! HashMap wrappers for our needs.
// Safety info: All implementations here are taken directly from hashbrown HashMap impl.
use std::marker::PhantomData;
use hashbrown::raw;
// taken from https://docs.rs/hashbrown/0.14.5/src/hashbrown/map.rs.html#919-932
pub fn retain<K, V, F>(table: &mut raw::RawTable<(K, V)>, mut f: F)
where
F: FnMut(&K, &mut V) -> bool,
{
// SAFETY: Here we only use `iter` as a temporary, preventing use-after-free
unsafe {
for item in table.iter() {
let &mut (ref key, ref mut value) = item.as_mut();
if !f(key, value) {
table.erase(item);
}
}
}
}
// taken from https://docs.rs/hashbrown/0.14.5/src/hashbrown/map.rs.html#756-764
pub fn iter<K, V>(table: &raw::RawTable<(K, V)>) -> impl Iterator<Item = (&K, &V)> + '_ {
pub struct Iter<'a, K, V> {
inner: raw::RawIter<(K, V)>,
marker: PhantomData<(&'a K, &'a V)>,
}
impl<'a, K, V> Iterator for Iter<'a, K, V> {
type Item = (&'a K, &'a V);
#[cfg_attr(feature = "inline-more", inline)]
fn next(&mut self) -> Option<(&'a K, &'a V)> {
let x = self.inner.next()?;
// SAFETY: the borrows do not outlive the rawtable
unsafe {
let r = x.as_ref();
Some((&r.0, &r.1))
}
}
#[cfg_attr(feature = "inline-more", inline)]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
// SAFETY:
// > It is up to the caller to ensure that the RawTable outlives the RawIter
// Here we tie the lifetime of self to the iter.
unsafe {
Iter {
inner: table.iter(),
marker: PhantomData,
}
}
}

View File

@@ -324,7 +324,8 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
.start_timer();
let current_len = shard.len();
let mut clients_removed = 0;
shard.retain(|endpoint, x| {
crate::rawtable::retain(&mut *shard, |endpoint, x| {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {

View File

@@ -314,7 +314,7 @@ impl ComputeHook {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
endpoint
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
.reconfigure(compute_pageservers.clone(), *stripe_size)
.await?;
}
}

View File

@@ -1914,7 +1914,6 @@ class NeonCli(AbstractNeonCli):
endpoint_id: str,
tenant_id: Optional[TenantId] = None,
pageserver_id: Optional[int] = None,
safekeepers: Optional[List[int]] = None,
check_return_code=True,
) -> "subprocess.CompletedProcess[str]":
args = ["endpoint", "reconfigure", endpoint_id]
@@ -1922,8 +1921,6 @@ class NeonCli(AbstractNeonCli):
args.extend(["--tenant-id", str(tenant_id)])
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
if safekeepers is not None:
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
return self.raw_cli(args, check_return_code=check_return_code)
def endpoint_stop(
@@ -3410,7 +3407,6 @@ class Endpoint(PgProtocol, LogUtils):
self.pg_port = pg_port
self.http_port = http_port
self.check_stop_result = check_stop_result
# passed to endpoint create and endpoint reconfigure
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
@@ -3473,7 +3469,6 @@ class Endpoint(PgProtocol, LogUtils):
self,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
safekeepers: Optional[List[int]] = None,
allow_multiple: bool = False,
) -> "Endpoint":
"""
@@ -3483,11 +3478,6 @@ class Endpoint(PgProtocol, LogUtils):
assert self.endpoint_id is not None
# If `safekeepers` is not None, they are remember them as active and use
# in the following commands.
if safekeepers is not None:
self.active_safekeepers = safekeepers
log.info(f"Starting postgres endpoint {self.endpoint_id}")
self.env.neon_cli.endpoint_start(
@@ -3548,17 +3538,9 @@ class Endpoint(PgProtocol, LogUtils):
if self.running:
self.safe_psql("SELECT pg_reload_conf()")
def reconfigure(
self, pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None
):
def reconfigure(self, pageserver_id: Optional[int] = None):
assert self.endpoint_id is not None
# If `safekeepers` is not None, they are remember them as active and use
# in the following commands.
if safekeepers is not None:
self.active_safekeepers = safekeepers
self.env.neon_cli.endpoint_reconfigure(
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
)
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
def respec(self, **kwargs):
"""Update the endpoint.json file used by control_plane."""

View File

@@ -6,7 +6,7 @@
"": {
"dependencies": {
"@neondatabase/serverless": "0.9.0",
"ws": "8.16.0"
"ws": "8.17.1"
}
},
"node_modules/@neondatabase/serverless": {
@@ -96,9 +96,9 @@
}
},
"node_modules/ws": {
"version": "8.16.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.16.0.tgz",
"integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==",
"version": "8.17.1",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz",
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
"engines": {
"node": ">=10.0.0"
},

View File

@@ -2,6 +2,6 @@
"type": "module",
"dependencies": {
"@neondatabase/serverless": "0.9.0",
"ws": "8.16.0"
"ws": "8.17.1"
}
}

View File

@@ -195,6 +195,8 @@ def test_fully_custom_config(positive_env: NeonEnv):
"walreceiver_connect_timeout": "13m",
"image_layer_creation_check_threshold": 1,
"switch_aux_file_policy": "cross-validation",
"lsn_lease_length": "1m",
"lsn_lease_length_for_ts": "5s",
}
ps_http = env.pageserver.http_client()

View File

@@ -1724,10 +1724,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
# Basic pull_timeline test.
# When live_sk_change is False, compute is restarted to change set of
# safekeepers; otherwise it is live reload.
@pytest.mark.parametrize("live_sk_change", [False, True])
def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
def execute_payload(endpoint: Endpoint):
@@ -1760,7 +1757,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
endpoint = env.endpoints.create("main")
endpoint.start(safekeepers=[1, 2, 3])
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
@@ -1772,22 +1770,29 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
env.safekeepers[3].start()
res = env.safekeepers[3].pull_timeline(
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
res = (
env.safekeepers[3]
.http_client(auth_token=env.auth_keys.generate_safekeeper_token())
.pull_timeline(
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"http_hosts": [
f"http://localhost:{env.safekeepers[0].port.http}",
f"http://localhost:{env.safekeepers[2].port.http}",
],
}
)
)
log.info("Finished pulling timeline")
log.info(res)
show_statuses(env.safekeepers, tenant_id, timeline_id)
action = "reconfiguing" if live_sk_change else "restarting"
log.info(f"{action} compute with new config to verify that it works")
new_sks = [1, 3, 4]
if not live_sk_change:
endpoint.stop_and_destroy().create("main")
endpoint.start(safekeepers=new_sks)
else:
endpoint.reconfigure(safekeepers=new_sks)
log.info("Restarting compute with new config to verify that it works")
endpoint.stop_and_destroy().create("main")
endpoint.active_safekeepers = [1, 3, 4]
endpoint.start()
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)