mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
Compare commits
10 Commits
release-pr
...
vlad/vecto
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64ad7658e3 | ||
|
|
cffe724b01 | ||
|
|
2d7432231f | ||
|
|
58f00b83c1 | ||
|
|
0870dafc32 | ||
|
|
024f2923a6 | ||
|
|
7debd6162c | ||
|
|
c861d71eeb | ||
|
|
6e46204712 | ||
|
|
5c6d78d469 |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -1132,11 +1132,9 @@ jobs:
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
|
||||
gh workflow --repo neondatabase/aws run deploy-proxy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
-f deployStorage=false \
|
||||
-f deployStorageBroker=false \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
else
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Zenith storage node — alternative
|
||||
# Neon storage node — alternative
|
||||
|
||||
## **Design considerations**
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Command line interface (end-user)
|
||||
|
||||
Zenith CLI as it is described here mostly resides on the same conceptual level as pg_ctl/initdb/pg_recvxlog/etc and replaces some of them in an opinionated way. I would also suggest bundling our patched postgres inside zenith distribution at least at the start.
|
||||
Neon CLI as it is described here mostly resides on the same conceptual level as pg_ctl/initdb/pg_recvxlog/etc and replaces some of them in an opinionated way. I would also suggest bundling our patched postgres inside neon distribution at least at the start.
|
||||
|
||||
This proposal is focused on managing local installations. For cluster operations, different tooling would be needed. The point of integration between the two is storage URL: no matter how complex cluster setup is it may provide an endpoint where the user may push snapshots.
|
||||
|
||||
@@ -8,40 +8,40 @@ The most important concept here is a snapshot, which can be created/pushed/pulle
|
||||
|
||||
# Possible usage scenarios
|
||||
|
||||
## Install zenith, run a postgres
|
||||
## Install neon, run a postgres
|
||||
|
||||
```
|
||||
> brew install pg-zenith
|
||||
> zenith pg create # creates pgdata with default pattern pgdata$i
|
||||
> zenith pg list
|
||||
> brew install pg-neon
|
||||
> neon pg create # creates pgdata with default pattern pgdata$i
|
||||
> neon pg list
|
||||
ID PGDATA USED STORAGE ENDPOINT
|
||||
primary1 pgdata1 0G zenith-local localhost:5432
|
||||
primary1 pgdata1 0G neon-local localhost:5432
|
||||
```
|
||||
|
||||
## Import standalone postgres to zenith
|
||||
## Import standalone postgres to neon
|
||||
|
||||
```
|
||||
> zenith snapshot import --from=basebackup://replication@localhost:5432/ oldpg
|
||||
> neon snapshot import --from=basebackup://replication@localhost:5432/ oldpg
|
||||
[====================------------] 60% | 20MB/s
|
||||
> zenith snapshot list
|
||||
> neon snapshot list
|
||||
ID SIZE PARENT
|
||||
oldpg 5G -
|
||||
|
||||
> zenith pg create --snapshot oldpg
|
||||
> neon pg create --snapshot oldpg
|
||||
Started postgres on localhost:5432
|
||||
|
||||
> zenith pg list
|
||||
> neon pg list
|
||||
ID PGDATA USED STORAGE ENDPOINT
|
||||
primary1 pgdata1 5G zenith-local localhost:5432
|
||||
primary1 pgdata1 5G neon-local localhost:5432
|
||||
|
||||
> zenith snapshot destroy oldpg
|
||||
> neon snapshot destroy oldpg
|
||||
Ok
|
||||
```
|
||||
|
||||
Also, we may start snapshot import implicitly by looking at snapshot schema
|
||||
|
||||
```
|
||||
> zenith pg create --snapshot basebackup://replication@localhost:5432/
|
||||
> neon pg create --snapshot basebackup://replication@localhost:5432/
|
||||
Downloading snapshot... Done.
|
||||
Started postgres on localhost:5432
|
||||
Destroying snapshot... Done.
|
||||
@@ -52,39 +52,39 @@ Destroying snapshot... Done.
|
||||
Since we may export the whole snapshot as one big file (tar of basebackup, maybe with some manifest) it may be shared over conventional means: http, ssh, [git+lfs](https://docs.github.com/en/github/managing-large-files/about-git-large-file-storage).
|
||||
|
||||
```
|
||||
> zenith pg create --snapshot http://learn-postgres.com/movies_db.zenith movies
|
||||
> neon pg create --snapshot http://learn-postgres.com/movies_db.neon movies
|
||||
```
|
||||
|
||||
## Create snapshot and push it to the cloud
|
||||
|
||||
```
|
||||
> zenith snapshot create pgdata1@snap1
|
||||
> zenith snapshot push --to ssh://stas@zenith.tech pgdata1@snap1
|
||||
> neon snapshot create pgdata1@snap1
|
||||
> neon snapshot push --to ssh://stas@neon.tech pgdata1@snap1
|
||||
```
|
||||
|
||||
## Rollback database to the snapshot
|
||||
|
||||
One way to rollback the database is just to init a new database from the snapshot and destroy the old one. But creating a new database from a snapshot would require a copy of that snapshot which is time consuming operation. Another option that would be cool to support is the ability to create the copy-on-write database from the snapshot without copying data, and store updated pages in a separate location, however that way would have performance implications. So to properly rollback the database to the older state we have `zenith pg checkout`.
|
||||
One way to rollback the database is just to init a new database from the snapshot and destroy the old one. But creating a new database from a snapshot would require a copy of that snapshot which is time consuming operation. Another option that would be cool to support is the ability to create the copy-on-write database from the snapshot without copying data, and store updated pages in a separate location, however that way would have performance implications. So to properly rollback the database to the older state we have `neon pg checkout`.
|
||||
|
||||
```
|
||||
> zenith pg list
|
||||
> neon pg list
|
||||
ID PGDATA USED STORAGE ENDPOINT
|
||||
primary1 pgdata1 5G zenith-local localhost:5432
|
||||
primary1 pgdata1 5G neon-local localhost:5432
|
||||
|
||||
> zenith snapshot create pgdata1@snap1
|
||||
> neon snapshot create pgdata1@snap1
|
||||
|
||||
> zenith snapshot list
|
||||
> neon snapshot list
|
||||
ID SIZE PARENT
|
||||
oldpg 5G -
|
||||
pgdata1@snap1 6G -
|
||||
pgdata1@CURRENT 6G -
|
||||
|
||||
> zenith pg checkout pgdata1@snap1
|
||||
> neon pg checkout pgdata1@snap1
|
||||
Stopping postgres on pgdata1.
|
||||
Rolling back pgdata1@CURRENT to pgdata1@snap1.
|
||||
Starting postgres on pgdata1.
|
||||
|
||||
> zenith snapshot list
|
||||
> neon snapshot list
|
||||
ID SIZE PARENT
|
||||
oldpg 5G -
|
||||
pgdata1@snap1 6G -
|
||||
@@ -99,7 +99,7 @@ Some notes: pgdata1@CURRENT -- implicit snapshot representing the current state
|
||||
PITR area acts like a continuous snapshot where you can reset the database to any point in time within this area (by area I mean some TTL period or some size limit, both possibly infinite).
|
||||
|
||||
```
|
||||
> zenith pitr create --storage s3tank --ttl 30d --name pitr_last_month
|
||||
> neon pitr create --storage s3tank --ttl 30d --name pitr_last_month
|
||||
```
|
||||
|
||||
Resetting the database to some state in past would require creating a snapshot on some lsn / time in this pirt area.
|
||||
@@ -108,29 +108,29 @@ Resetting the database to some state in past would require creating a snapshot o
|
||||
|
||||
## storage
|
||||
|
||||
Storage is either zenith pagestore or s3. Users may create a database in a pagestore and create/move *snapshots* and *pitr regions* in both pagestore and s3. Storage is a concept similar to `git remote`. After installation, I imagine one local storage is available by default.
|
||||
Storage is either neon pagestore or s3. Users may create a database in a pagestore and create/move *snapshots* and *pitr regions* in both pagestore and s3. Storage is a concept similar to `git remote`. After installation, I imagine one local storage is available by default.
|
||||
|
||||
**zenith storage attach** -t [native|s3] -c key=value -n name
|
||||
**neon storage attach** -t [native|s3] -c key=value -n name
|
||||
|
||||
Attaches/initializes storage. For --type=s3, user credentials and path should be provided. For --type=native we may support --path=/local/path and --url=zenith.tech/stas/mystore. Other possible term for native is 'zstore'.
|
||||
Attaches/initializes storage. For --type=s3, user credentials and path should be provided. For --type=native we may support --path=/local/path and --url=neon.tech/stas/mystore. Other possible term for native is 'zstore'.
|
||||
|
||||
|
||||
**zenith storage list**
|
||||
**neon storage list**
|
||||
|
||||
Show currently attached storages. For example:
|
||||
|
||||
```
|
||||
> zenith storage list
|
||||
> neon storage list
|
||||
NAME USED TYPE OPTIONS PATH
|
||||
local 5.1G zenith-local /opt/zenith/store/local
|
||||
local.compr 20.4G zenith-local compression=on /opt/zenith/store/local.compr
|
||||
zcloud 60G zenith-remote zenith.tech/stas/mystore
|
||||
local 5.1G neon-local /opt/neon/store/local
|
||||
local.compr 20.4G neon-local compression=on /opt/neon/store/local.compr
|
||||
zcloud 60G neon-remote neon.tech/stas/mystore
|
||||
s3tank 80G S3
|
||||
```
|
||||
|
||||
**zenith storage detach**
|
||||
**neon storage detach**
|
||||
|
||||
**zenith storage show**
|
||||
**neon storage show**
|
||||
|
||||
|
||||
|
||||
@@ -140,29 +140,29 @@ Manages postgres data directories and can start postgres instances with proper c
|
||||
|
||||
Pg is a term for a single postgres running on some data. I'm trying to avoid separation of datadir management and postgres instance management -- both that concepts bundled here together.
|
||||
|
||||
**zenith pg create** [--no-start --snapshot --cow] -s storage-name -n pgdata
|
||||
**neon pg create** [--no-start --snapshot --cow] -s storage-name -n pgdata
|
||||
|
||||
Creates (initializes) new data directory in given storage and starts postgres. I imagine that storage for this operation may be only local and data movement to remote location happens through snapshots/pitr.
|
||||
|
||||
--no-start: just init datadir without creating
|
||||
|
||||
--snapshot snap: init from the snapshot. Snap is a name or URL (zenith.tech/stas/mystore/snap1)
|
||||
--snapshot snap: init from the snapshot. Snap is a name or URL (neon.tech/stas/mystore/snap1)
|
||||
|
||||
--cow: initialize Copy-on-Write data directory on top of some snapshot (makes sense if it is a snapshot of currently running a database)
|
||||
|
||||
**zenith pg destroy**
|
||||
**neon pg destroy**
|
||||
|
||||
**zenith pg start** [--replica] pgdata
|
||||
**neon pg start** [--replica] pgdata
|
||||
|
||||
Start postgres with proper extensions preloaded/installed.
|
||||
|
||||
**zenith pg checkout**
|
||||
**neon pg checkout**
|
||||
|
||||
Rollback data directory to some previous snapshot.
|
||||
|
||||
**zenith pg stop** pg_id
|
||||
**neon pg stop** pg_id
|
||||
|
||||
**zenith pg list**
|
||||
**neon pg list**
|
||||
|
||||
```
|
||||
ROLE PGDATA USED STORAGE ENDPOINT
|
||||
@@ -173,7 +173,7 @@ primary my_pg2 3.2G local.compr localhost:5435
|
||||
- my_pg3 9.2G local.compr -
|
||||
```
|
||||
|
||||
**zenith pg show**
|
||||
**neon pg show**
|
||||
|
||||
```
|
||||
my_pg:
|
||||
@@ -194,7 +194,7 @@ my_pg:
|
||||
|
||||
```
|
||||
|
||||
**zenith pg start-rest/graphql** pgdata
|
||||
**neon pg start-rest/graphql** pgdata
|
||||
|
||||
Starts REST/GraphQL proxy on top of postgres master. Not sure we should do that, just an idea.
|
||||
|
||||
@@ -203,35 +203,35 @@ Starts REST/GraphQL proxy on top of postgres master. Not sure we should do that,
|
||||
|
||||
Snapshot creation is cheap -- no actual data is copied, we just start retaining old pages. Snapshot size means the amount of retained data, not all data. Snapshot name looks like pgdata_name@tag_name. tag_name is set by the user during snapshot creation. There are some reserved tag names: CURRENT represents the current state of the data directory; HEAD{i} represents the data directory state that resided in the database before i-th checkout.
|
||||
|
||||
**zenith snapshot create** pgdata_name@snap_name
|
||||
**neon snapshot create** pgdata_name@snap_name
|
||||
|
||||
Creates a new snapshot in the same storage where pgdata_name exists.
|
||||
|
||||
**zenith snapshot push** --to url pgdata_name@snap_name
|
||||
**neon snapshot push** --to url pgdata_name@snap_name
|
||||
|
||||
Produces binary stream of a given snapshot. Under the hood starts temp read-only postgres over this snapshot and sends basebackup stream. Receiving side should start `zenith snapshot recv` before push happens. If url has some special schema like zenith:// receiving side may require auth start `zenith snapshot recv` on the go.
|
||||
Produces binary stream of a given snapshot. Under the hood starts temp read-only postgres over this snapshot and sends basebackup stream. Receiving side should start `neon snapshot recv` before push happens. If url has some special schema like neon:// receiving side may require auth start `neon snapshot recv` on the go.
|
||||
|
||||
**zenith snapshot recv**
|
||||
**neon snapshot recv**
|
||||
|
||||
Starts a port listening for a basebackup stream, prints connection info to stdout (so that user may use that in push command), and expects data on that socket.
|
||||
|
||||
**zenith snapshot pull** --from url or path
|
||||
**neon snapshot pull** --from url or path
|
||||
|
||||
Connects to a remote zenith/s3/file and pulls snapshot. The remote site should be zenith service or files in our format.
|
||||
Connects to a remote neon/s3/file and pulls snapshot. The remote site should be neon service or files in our format.
|
||||
|
||||
**zenith snapshot import** --from basebackup://<...> or path
|
||||
**neon snapshot import** --from basebackup://<...> or path
|
||||
|
||||
Creates a new snapshot out of running postgres via basebackup protocol or basebackup files.
|
||||
|
||||
**zenith snapshot export**
|
||||
**neon snapshot export**
|
||||
|
||||
Starts read-only postgres over this snapshot and exports data in some format (pg_dump, or COPY TO on some/all tables). One of the options may be zenith own format which is handy for us (but I think just tar of basebackup would be okay).
|
||||
Starts read-only postgres over this snapshot and exports data in some format (pg_dump, or COPY TO on some/all tables). One of the options may be neon own format which is handy for us (but I think just tar of basebackup would be okay).
|
||||
|
||||
**zenith snapshot diff** snap1 snap2
|
||||
**neon snapshot diff** snap1 snap2
|
||||
|
||||
Shows size of data changed between two snapshots. We also may provide options to diff schema/data in tables. To do that start temp read-only postgreses.
|
||||
|
||||
**zenith snapshot destroy**
|
||||
**neon snapshot destroy**
|
||||
|
||||
## pitr
|
||||
|
||||
@@ -239,7 +239,7 @@ Pitr represents wal stream and ttl policy for that stream
|
||||
|
||||
XXX: any suggestions on a better name?
|
||||
|
||||
**zenith pitr create** name
|
||||
**neon pitr create** name
|
||||
|
||||
--ttl = inf | period
|
||||
|
||||
@@ -247,21 +247,21 @@ XXX: any suggestions on a better name?
|
||||
|
||||
--storage = storage_name
|
||||
|
||||
**zenith pitr extract-snapshot** pitr_name --lsn xxx
|
||||
**neon pitr extract-snapshot** pitr_name --lsn xxx
|
||||
|
||||
Creates a snapshot out of some lsn in PITR area. The obtained snapshot may be managed with snapshot routines (move/send/export)
|
||||
|
||||
**zenith pitr gc** pitr_name
|
||||
**neon pitr gc** pitr_name
|
||||
|
||||
Force garbage collection on some PITR area.
|
||||
|
||||
**zenith pitr list**
|
||||
**neon pitr list**
|
||||
|
||||
**zenith pitr destroy**
|
||||
**neon pitr destroy**
|
||||
|
||||
|
||||
## console
|
||||
|
||||
**zenith console**
|
||||
**neon console**
|
||||
|
||||
Opens browser targeted at web console with the more or less same functionality as described here.
|
||||
|
||||
@@ -6,7 +6,7 @@ When do we consider the WAL record as durable, so that we can
|
||||
acknowledge the commit to the client and be reasonably certain that we
|
||||
will not lose the transaction?
|
||||
|
||||
Zenith uses a group of WAL safekeeper nodes to hold the generated WAL.
|
||||
Neon uses a group of WAL safekeeper nodes to hold the generated WAL.
|
||||
A WAL record is considered durable, when it has been written to a
|
||||
majority of WAL safekeeper nodes. In this document, I use 5
|
||||
safekeepers, because I have five fingers. A WAL record is durable,
|
||||
|
||||
@@ -1,23 +1,23 @@
|
||||
# Zenith local
|
||||
# Neon local
|
||||
|
||||
Here I list some objectives to keep in mind when discussing zenith-local design and a proposal that brings all components together. Your comments on both parts are very welcome.
|
||||
Here I list some objectives to keep in mind when discussing neon-local design and a proposal that brings all components together. Your comments on both parts are very welcome.
|
||||
|
||||
#### Why do we need it?
|
||||
- For distribution - this easy to use binary will help us to build adoption among developers.
|
||||
- For internal use - to test all components together.
|
||||
|
||||
In my understanding, we consider it to be just a mock-up version of zenith-cloud.
|
||||
In my understanding, we consider it to be just a mock-up version of neon-cloud.
|
||||
> Question: How much should we care about durability and security issues for a local setup?
|
||||
|
||||
|
||||
#### Why is it better than a simple local postgres?
|
||||
|
||||
- Easy one-line setup. As simple as `cargo install zenith && zenith start`
|
||||
- Easy one-line setup. As simple as `cargo install neon && neon start`
|
||||
|
||||
- Quick and cheap creation of compute nodes over the same storage.
|
||||
> Question: How can we describe a use-case for this feature?
|
||||
|
||||
- Zenith-local can work with S3 directly.
|
||||
- Neon-local can work with S3 directly.
|
||||
|
||||
- Push and pull images (snapshots) to remote S3 to exchange data with other users.
|
||||
|
||||
@@ -31,50 +31,50 @@ Ideally, just one binary that incorporates all elements we need.
|
||||
|
||||
#### Components:
|
||||
|
||||
- **zenith-CLI** - interface for end-users. Turns commands to REST requests and handles responses to show them in a user-friendly way.
|
||||
CLI proposal is here https://github.com/libzenith/rfcs/blob/003-laptop-cli.md/003-laptop-cli.md
|
||||
WIP code is here: https://github.com/libzenith/postgres/tree/main/pageserver/src/bin/cli
|
||||
- **neon-CLI** - interface for end-users. Turns commands to REST requests and handles responses to show them in a user-friendly way.
|
||||
CLI proposal is here https://github.com/neondatabase/rfcs/blob/003-laptop-cli.md/003-laptop-cli.md
|
||||
WIP code is here: https://github.com/neondatabase/postgres/tree/main/pageserver/src/bin/cli
|
||||
|
||||
- **zenith-console** - WEB UI with same functionality as CLI.
|
||||
- **neon-console** - WEB UI with same functionality as CLI.
|
||||
>Note: not for the first release.
|
||||
|
||||
- **zenith-local** - entrypoint. Service that starts all other components and handles REST API requests. See REST API proposal below.
|
||||
> Idea: spawn all other components as child processes, so that we could shutdown everything by stopping zenith-local.
|
||||
- **neon-local** - entrypoint. Service that starts all other components and handles REST API requests. See REST API proposal below.
|
||||
> Idea: spawn all other components as child processes, so that we could shutdown everything by stopping neon-local.
|
||||
|
||||
- **zenith-pageserver** - consists of a storage and WAL-replaying service (modified PG in current implementation).
|
||||
- **neon-pageserver** - consists of a storage and WAL-replaying service (modified PG in current implementation).
|
||||
> Question: Probably, for local setup we should be able to bypass page-storage and interact directly with S3 to avoid double caching in shared buffers and page-server?
|
||||
|
||||
WIP code is here: https://github.com/libzenith/postgres/tree/main/pageserver/src
|
||||
WIP code is here: https://github.com/neondatabase/postgres/tree/main/pageserver/src
|
||||
|
||||
- **zenith-S3** - stores base images of the database and WAL in S3 object storage. Import and export images from/to zenith.
|
||||
- **neon-S3** - stores base images of the database and WAL in S3 object storage. Import and export images from/to neon.
|
||||
> Question: How should it operate in a local setup? Will we manage it ourselves or ask user to provide credentials for existing S3 object storage (i.e. minio)?
|
||||
> Question: Do we use it together with local page store or they are interchangeable?
|
||||
|
||||
WIP code is ???
|
||||
|
||||
- **zenith-safekeeper** - receives WAL from postgres, stores it durably, answers to Postgres that "sync" is succeed.
|
||||
- **neon-safekeeper** - receives WAL from postgres, stores it durably, answers to Postgres that "sync" is succeed.
|
||||
> Question: How should it operate in a local setup? In my understanding it should push WAL directly to S3 (if we use it) or store all data locally (if we use local page storage). The latter option seems meaningless (extra overhead and no gain), but it is still good to test the system.
|
||||
|
||||
WIP code is here: https://github.com/libzenith/postgres/tree/main/src/bin/safekeeper
|
||||
WIP code is here: https://github.com/neondatabase/postgres/tree/main/src/bin/safekeeper
|
||||
|
||||
- **zenith-computenode** - bottomless PostgreSQL, ideally upstream, but for a start - our modified version. User can quickly create and destroy them and work with it as a regular postgres database.
|
||||
- **neon-computenode** - bottomless PostgreSQL, ideally upstream, but for a start - our modified version. User can quickly create and destroy them and work with it as a regular postgres database.
|
||||
|
||||
WIP code is in main branch and here: https://github.com/libzenith/postgres/commits/compute_node
|
||||
WIP code is in main branch and here: https://github.com/neondatabase/postgres/commits/compute_node
|
||||
|
||||
#### REST API:
|
||||
|
||||
Service endpoint: `http://localhost:3000`
|
||||
|
||||
Resources:
|
||||
- /storages - Where data lives: zenith-pageserver or zenith-s3
|
||||
- /pgs - Postgres - zenith-computenode
|
||||
- /storages - Where data lives: neon-pageserver or neon-s3
|
||||
- /pgs - Postgres - neon-computenode
|
||||
- /snapshots - snapshots **TODO**
|
||||
|
||||
>Question: Do we want to extend this API to manage zenith components? I.e. start page-server, manage safekeepers and so on? Or they will be hardcoded to just start once and for all?
|
||||
>Question: Do we want to extend this API to manage neon components? I.e. start page-server, manage safekeepers and so on? Or they will be hardcoded to just start once and for all?
|
||||
|
||||
Methods and their mapping to CLI:
|
||||
|
||||
- /storages - zenith-pageserver or zenith-s3
|
||||
- /storages - neon-pageserver or neon-s3
|
||||
|
||||
CLI | REST API
|
||||
------------- | -------------
|
||||
@@ -84,7 +84,7 @@ storage list | GET /storages
|
||||
storage show -n name | GET /storages/:storage_name
|
||||
|
||||
|
||||
- /pgs - zenith-computenode
|
||||
- /pgs - neon-computenode
|
||||
|
||||
CLI | REST API
|
||||
------------- | -------------
|
||||
|
||||
@@ -1,45 +1,45 @@
|
||||
Zenith CLI allows you to operate database clusters (catalog clusters) and their commit history locally and in the cloud. Since ANSI calls them catalog clusters and cluster is a loaded term in the modern infrastructure we will call it "catalog".
|
||||
Neon CLI allows you to operate database clusters (catalog clusters) and their commit history locally and in the cloud. Since ANSI calls them catalog clusters and cluster is a loaded term in the modern infrastructure we will call it "catalog".
|
||||
|
||||
# CLI v2 (after chatting with Carl)
|
||||
|
||||
Zenith introduces the notion of a repository.
|
||||
Neon introduces the notion of a repository.
|
||||
|
||||
```bash
|
||||
zenith init
|
||||
zenith clone zenith://zenith.tech/piedpiper/northwind -- clones a repo to the northwind directory
|
||||
neon init
|
||||
neon clone neon://neon.tech/piedpiper/northwind -- clones a repo to the northwind directory
|
||||
```
|
||||
|
||||
Once you have a cluster catalog you can explore it
|
||||
|
||||
```bash
|
||||
zenith log -- returns a list of commits
|
||||
zenith status -- returns if there are changes in the catalog that can be committed
|
||||
zenith commit -- commits the changes and generates a new commit hash
|
||||
zenith branch experimental <hash> -- creates a branch called testdb based on a given commit hash
|
||||
neon log -- returns a list of commits
|
||||
neon status -- returns if there are changes in the catalog that can be committed
|
||||
neon commit -- commits the changes and generates a new commit hash
|
||||
neon branch experimental <hash> -- creates a branch called testdb based on a given commit hash
|
||||
```
|
||||
|
||||
To make changes in the catalog you need to run compute nodes
|
||||
|
||||
```bash
|
||||
-- here is how you a compute node
|
||||
zenith start /home/pipedpiper/northwind:main -- starts a compute instance
|
||||
zenith start zenith://zenith.tech/northwind:main -- starts a compute instance in the cloud
|
||||
neon start /home/pipedpiper/northwind:main -- starts a compute instance
|
||||
neon start neon://neon.tech/northwind:main -- starts a compute instance in the cloud
|
||||
-- you can start a compute node against any hash or branch
|
||||
zenith start /home/pipedpiper/northwind:experimental --port 8008 -- start another compute instance (on different port)
|
||||
neon start /home/pipedpiper/northwind:experimental --port 8008 -- start another compute instance (on different port)
|
||||
-- you can start a compute node against any hash or branch
|
||||
zenith start /home/pipedpiper/northwind:<hash> --port 8009 -- start another compute instance (on different port)
|
||||
neon start /home/pipedpiper/northwind:<hash> --port 8009 -- start another compute instance (on different port)
|
||||
|
||||
-- After running some DML you can run
|
||||
-- zenith status and see how there are two WAL streams one on top of
|
||||
-- neon status and see how there are two WAL streams one on top of
|
||||
-- the main branch
|
||||
zenith status
|
||||
neon status
|
||||
-- and another on top of the experimental branch
|
||||
zenith status -b experimental
|
||||
neon status -b experimental
|
||||
|
||||
-- you can commit each branch separately
|
||||
zenith commit main
|
||||
neon commit main
|
||||
-- or
|
||||
zenith commit -c /home/pipedpiper/northwind:experimental
|
||||
neon commit -c /home/pipedpiper/northwind:experimental
|
||||
```
|
||||
|
||||
Starting compute instances against cloud environments
|
||||
@@ -47,18 +47,18 @@ Starting compute instances against cloud environments
|
||||
```bash
|
||||
-- you can start a compute instance against the cloud environment
|
||||
-- in this case all of the changes will be streamed into the cloud
|
||||
zenith start https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith start https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith status -c https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith commit -c https://zenith:tech/pipedpiper/northwind:main
|
||||
zenith branch -c https://zenith:tech/pipedpiper/northwind:<hash> experimental
|
||||
neon start https://neon:tecj/pipedpiper/northwind:main
|
||||
neon start https://neon:tecj/pipedpiper/northwind:main
|
||||
neon status -c https://neon:tecj/pipedpiper/northwind:main
|
||||
neon commit -c https://neon:tecj/pipedpiper/northwind:main
|
||||
neon branch -c https://neon:tecj/pipedpiper/northwind:<hash> experimental
|
||||
```
|
||||
|
||||
Pushing data into the cloud
|
||||
|
||||
```bash
|
||||
-- pull all the commits from the cloud
|
||||
zenith pull
|
||||
neon pull
|
||||
-- push all the commits to the cloud
|
||||
zenith push
|
||||
neon push
|
||||
```
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
# Repository format
|
||||
|
||||
A Zenith repository is similar to a traditional PostgreSQL backup
|
||||
A Neon repository is similar to a traditional PostgreSQL backup
|
||||
archive, like a WAL-G bucket or pgbarman backup catalogue. It holds
|
||||
multiple versions of a PostgreSQL database cluster.
|
||||
|
||||
The distinguishing feature is that you can launch a Zenith Postgres
|
||||
The distinguishing feature is that you can launch a Neon Postgres
|
||||
server directly against a branch in the repository, without having to
|
||||
"restore" it first. Also, Zenith manages the storage automatically,
|
||||
"restore" it first. Also, Neon manages the storage automatically,
|
||||
there is no separation between full and incremental backups nor WAL
|
||||
archive. Zenith relies heavily on the WAL, and uses concepts similar
|
||||
archive. Neon relies heavily on the WAL, and uses concepts similar
|
||||
to incremental backups and WAL archiving internally, but it is hidden
|
||||
from the user.
|
||||
|
||||
@@ -19,15 +19,15 @@ efficient. Just something to get us started.
|
||||
|
||||
The repository directory looks like this:
|
||||
|
||||
.zenith/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/wal/
|
||||
.zenith/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/snapshots/<lsn>/
|
||||
.zenith/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/history
|
||||
.neon/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/wal/
|
||||
.neon/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/snapshots/<lsn>/
|
||||
.neon/timelines/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c/history
|
||||
|
||||
.zenith/refs/branches/mybranch
|
||||
.zenith/refs/tags/foo
|
||||
.zenith/refs/tags/bar
|
||||
.neon/refs/branches/mybranch
|
||||
.neon/refs/tags/foo
|
||||
.neon/refs/tags/bar
|
||||
|
||||
.zenith/datadirs/<timeline uuid>
|
||||
.neon/datadirs/<timeline uuid>
|
||||
|
||||
### Timelines
|
||||
|
||||
@@ -39,7 +39,7 @@ All WAL is generated on a timeline. You can launch a read-only node
|
||||
against a tag or arbitrary LSN on a timeline, but in order to write,
|
||||
you need to create a timeline.
|
||||
|
||||
Each timeline is stored in a directory under .zenith/timelines. It
|
||||
Each timeline is stored in a directory under .neon/timelines. It
|
||||
consists of a WAL archive, containing all the WAL in the standard
|
||||
PostgreSQL format, under the wal/ subdirectory.
|
||||
|
||||
@@ -66,18 +66,18 @@ contains the UUID of the timeline (and LSN, for tags).
|
||||
|
||||
### Datadirs
|
||||
|
||||
.zenith/datadirs contains PostgreSQL data directories. You can launch
|
||||
.neon/datadirs contains PostgreSQL data directories. You can launch
|
||||
a Postgres instance on one of them with:
|
||||
|
||||
```
|
||||
postgres -D .zenith/datadirs/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c
|
||||
postgres -D .neon/datadirs/4543be3daeab2ed4e58a285cbb8dd1fce6970f8c
|
||||
```
|
||||
|
||||
All the actual data is kept in the timeline directories, under
|
||||
.zenith/timelines. The data directories are only needed for active
|
||||
.neon/timelines. The data directories are only needed for active
|
||||
PostgreQSL instances. After an instance is stopped, the data directory
|
||||
can be safely removed. "zenith start" will recreate it quickly from
|
||||
the data in .zenith/timelines, if it's missing.
|
||||
can be safely removed. "neon start" will recreate it quickly from
|
||||
the data in .neon/timelines, if it's missing.
|
||||
|
||||
## Version 2
|
||||
|
||||
@@ -103,14 +103,14 @@ more advanced. The exact format is TODO. But it should support:
|
||||
|
||||
### Garbage collection
|
||||
|
||||
When you run "zenith gc", old timelines that are no longer needed are
|
||||
When you run "neon gc", old timelines that are no longer needed are
|
||||
removed. That involves collecting the list of "unreachable" objects,
|
||||
starting from the named branches and tags.
|
||||
|
||||
Also, if enough WAL has been generated on a timeline since last
|
||||
snapshot, a new snapshot or delta is created.
|
||||
|
||||
### zenith push/pull
|
||||
### neon push/pull
|
||||
|
||||
Compare the tags and branches on both servers, and copy missing ones.
|
||||
For each branch, compare the timeline it points to in both servers. If
|
||||
@@ -123,7 +123,7 @@ every time you start up an instance? Then you would detect that the
|
||||
timelines have diverged. That would match with the "epoch" concept
|
||||
that we have in the WAL safekeeper
|
||||
|
||||
### zenith checkout/commit
|
||||
### neon checkout/commit
|
||||
|
||||
In this format, there is no concept of a "working tree", and hence no
|
||||
concept of checking out or committing. All modifications are done on
|
||||
@@ -134,7 +134,7 @@ You can easily fork off a temporary timeline to emulate a "working tree".
|
||||
You can later remove it and have it garbage collected, or to "commit",
|
||||
re-point the branch to the new timeline.
|
||||
|
||||
If we want to have a worktree and "zenith checkout/commit" concept, we can
|
||||
If we want to have a worktree and "neon checkout/commit" concept, we can
|
||||
emulate that with a temporary timeline. Create the temporary timeline at
|
||||
"zenith checkout", and have "zenith commit" modify the branch to point to
|
||||
"neon checkout", and have "neon commit" modify the branch to point to
|
||||
the new timeline.
|
||||
|
||||
@@ -4,27 +4,27 @@ How it works now
|
||||
1. Create repository, start page server on it
|
||||
|
||||
```
|
||||
$ zenith init
|
||||
$ neon init
|
||||
...
|
||||
created main branch
|
||||
new zenith repository was created in .zenith
|
||||
new neon repository was created in .neon
|
||||
|
||||
$ zenith pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .zenith
|
||||
$ neon pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .neon
|
||||
Page server started
|
||||
```
|
||||
|
||||
2. Create a branch, and start a Postgres instance on it
|
||||
|
||||
```
|
||||
$ zenith branch heikki main
|
||||
$ neon branch heikki main
|
||||
branching at end of WAL: 0/15ECF68
|
||||
|
||||
$ zenith pg create heikki
|
||||
$ neon pg create heikki
|
||||
Initializing Postgres on timeline 76cf9279915be7797095241638e64644...
|
||||
Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/pg1 port=55432
|
||||
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/pg1 port=55432
|
||||
|
||||
$ zenith pg start pg1
|
||||
$ neon pg start pg1
|
||||
Starting postgres node at 'host=127.0.0.1 port=55432 user=heikki'
|
||||
waiting for server to start.... done
|
||||
server started
|
||||
@@ -52,20 +52,20 @@ serverless on your laptop, so that the workflow becomes just:
|
||||
1. Create repository, start page server on it (same as before)
|
||||
|
||||
```
|
||||
$ zenith init
|
||||
$ neon init
|
||||
...
|
||||
created main branch
|
||||
new zenith repository was created in .zenith
|
||||
new neon repository was created in .neon
|
||||
|
||||
$ zenith pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .zenith
|
||||
$ neon pageserver start
|
||||
Starting pageserver at '127.0.0.1:64000' in .neon
|
||||
Page server started
|
||||
```
|
||||
|
||||
2. Create branch
|
||||
|
||||
```
|
||||
$ zenith branch heikki main
|
||||
$ neon branch heikki main
|
||||
branching at end of WAL: 0/15ECF68
|
||||
```
|
||||
|
||||
|
||||
@@ -7,22 +7,22 @@ Here is a proposal about implementing push/pull mechanics between pageservers. W
|
||||
The origin represents connection info for some remote pageserver. Let's use here same commands as git uses except using explicit list subcommand (git uses `origin -v` for that).
|
||||
|
||||
```
|
||||
zenith origin add <name> <connection_uri>
|
||||
zenith origin list
|
||||
zenith origin remove <name>
|
||||
neon origin add <name> <connection_uri>
|
||||
neon origin list
|
||||
neon origin remove <name>
|
||||
```
|
||||
|
||||
Connection URI a string of form `postgresql://user:pass@hostname:port` (https://www.postgresql.org/docs/13/libpq-connect.html#id-1.7.3.8.3.6). We can start with libpq password auth and later add support for client certs or require ssh as transport or invent some other kind of transport.
|
||||
|
||||
Behind the scenes, this commands may update toml file inside .zenith directory.
|
||||
Behind the scenes, this commands may update toml file inside .neon directory.
|
||||
|
||||
## Push
|
||||
|
||||
### Pushing branch
|
||||
|
||||
```
|
||||
zenith push mybranch cloudserver # push to eponymous branch in cloudserver
|
||||
zenith push mybranch cloudserver:otherbranch # push to a different branch in cloudserver
|
||||
neon push mybranch cloudserver # push to eponymous branch in cloudserver
|
||||
neon push mybranch cloudserver:otherbranch # push to a different branch in cloudserver
|
||||
```
|
||||
|
||||
Exact mechanics would be slightly different in the following situations:
|
||||
|
||||
@@ -2,7 +2,7 @@ While working on export/import commands, I understood that they fit really well
|
||||
|
||||
We may think about backups as snapshots in a different format (i.e plain pgdata format, basebackup tar format, WAL-G format (if they want to support it) and so on). They use same storage API, the only difference is the code that packs/unpacks files.
|
||||
|
||||
Even if zenith aims to maintains durability using it's own snapshots, backups will be useful for uploading data from postgres to zenith.
|
||||
Even if neon aims to maintains durability using it's own snapshots, backups will be useful for uploading data from postgres to neon.
|
||||
|
||||
So here is an attempt to design consistent CLI for different usage scenarios:
|
||||
|
||||
@@ -16,8 +16,8 @@ Save`storage_dest` and other parameters in config.
|
||||
Push snapshots to `storage_dest` in background.
|
||||
|
||||
```
|
||||
zenith init --storage_dest=S3_PREFIX
|
||||
zenith start
|
||||
neon init --storage_dest=S3_PREFIX
|
||||
neon start
|
||||
```
|
||||
|
||||
#### 2. Restart pageserver (manually or crash-recovery).
|
||||
@@ -25,7 +25,7 @@ Take `storage_dest` from pageserver config, start pageserver from latest snapsho
|
||||
Push snapshots to `storage_dest` in background.
|
||||
|
||||
```
|
||||
zenith start
|
||||
neon start
|
||||
```
|
||||
|
||||
#### 3. Import.
|
||||
@@ -35,22 +35,22 @@ Do not save `snapshot_path` and `snapshot_format` in config, as it is a one-time
|
||||
Save`storage_dest` parameters in config.
|
||||
Push snapshots to `storage_dest` in background.
|
||||
```
|
||||
//I.e. we want to start zenith on top of existing $PGDATA and use s3 as a persistent storage.
|
||||
zenith init --snapshot_path=FILE_PREFIX --snapshot_format=pgdata --storage_dest=S3_PREFIX
|
||||
zenith start
|
||||
//I.e. we want to start neon on top of existing $PGDATA and use s3 as a persistent storage.
|
||||
neon init --snapshot_path=FILE_PREFIX --snapshot_format=pgdata --storage_dest=S3_PREFIX
|
||||
neon start
|
||||
```
|
||||
How to pass credentials needed for `snapshot_path`?
|
||||
|
||||
#### 4. Export.
|
||||
Manually push snapshot to `snapshot_path` which differs from `storage_dest`
|
||||
Optionally set `snapshot_format`, which can be plain pgdata format or zenith format.
|
||||
Optionally set `snapshot_format`, which can be plain pgdata format or neon format.
|
||||
```
|
||||
zenith export --snapshot_path=FILE_PREFIX --snapshot_format=pgdata
|
||||
neon export --snapshot_path=FILE_PREFIX --snapshot_format=pgdata
|
||||
```
|
||||
|
||||
#### Notes and questions
|
||||
- safekeeper s3_offload should use same (similar) syntax for storage. How to set it in UI?
|
||||
- Why do we need `zenith init` as a separate command? Can't we init everything at first start?
|
||||
- Why do we need `neon init` as a separate command? Can't we init everything at first start?
|
||||
- We can think of better names for all options.
|
||||
- Export to plain postgres format will be useless, if we are not 100% compatible on page level.
|
||||
I can recall at least one such difference - PD_WAL_LOGGED flag in pages.
|
||||
|
||||
@@ -9,7 +9,7 @@ receival and this might lag behind `term`; safekeeper switches to epoch `n` when
|
||||
it has received all committed log records from all `< n` terms. This roughly
|
||||
corresponds to proposed in
|
||||
|
||||
https://github.com/zenithdb/rfcs/pull/3/files
|
||||
https://github.com/neondatabase/rfcs/pull/3/files
|
||||
|
||||
|
||||
This makes our biggest our difference from Raft. In Raft, every log record is
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Safekeeper gossip
|
||||
|
||||
Extracted from this [PR](https://github.com/zenithdb/rfcs/pull/13)
|
||||
Extracted from this [PR](https://github.com/neondatabase/rfcs/pull/13)
|
||||
|
||||
## Motivation
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
Created on 19.01.22
|
||||
|
||||
Initially created [here](https://github.com/zenithdb/rfcs/pull/16) by @kelvich.
|
||||
Initially created [here](https://github.com/neondatabase/rfcs/pull/16) by @kelvich.
|
||||
|
||||
That it is an alternative to (014-safekeeper-gossip)[]
|
||||
|
||||
@@ -292,4 +292,4 @@ But with an etcd we are in a bit different situation:
|
||||
1. We don't need persistency and strong consistency guarantees for the data we store in the etcd
|
||||
2. etcd uses Grpc as a protocol, and messages are pretty simple
|
||||
|
||||
So it looks like implementing in-mem store with etcd interface is straightforward thing _if we will want that in future_. At the same time, we can avoid implementing it right now, and we will be able to run local zenith installation with etcd running somewhere in the background (as opposed to building and running console, which in turn requires Postgres).
|
||||
So it looks like implementing in-mem store with etcd interface is straightforward thing _if we will want that in future_. At the same time, we can avoid implementing it right now, and we will be able to run local neon installation with etcd running somewhere in the background (as opposed to building and running console, which in turn requires Postgres).
|
||||
|
||||
@@ -738,6 +738,8 @@ pub enum PagestreamFeMessage {
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesRequest),
|
||||
GetControlFileValues(PagestreamGetControlFileValuesRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -749,6 +751,8 @@ pub enum PagestreamBeMessage {
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesResponse),
|
||||
GetControlFileValues(PagestreamGetControlFileValuesResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -760,6 +764,8 @@ enum PagestreamBeMessageTag {
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
GetVectoredPages = 106,
|
||||
GetControlFileValues = 107,
|
||||
}
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
@@ -771,6 +777,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
106 => Ok(PagestreamBeMessageTag::GetVectoredPages),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
@@ -813,6 +820,20 @@ pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetVectoredPagesRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
pub count: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetControlFileValuesRequest {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub exists: bool,
|
||||
@@ -833,6 +854,15 @@ pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetVectoredPagesResponse {
|
||||
pub page_count: u8,
|
||||
pub pages: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetControlFileValuesResponse {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub message: String,
|
||||
@@ -904,6 +934,22 @@ impl PagestreamFeMessage {
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(req) => {
|
||||
bytes.put_u8(5);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
bytes.put_u8(req.count);
|
||||
}
|
||||
Self::GetControlFileValues(req) => {
|
||||
bytes.put_u8(6);
|
||||
bytes.put_u64(req.lsn.0);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -962,6 +1008,25 @@ impl PagestreamFeMessage {
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
5 => Ok(PagestreamFeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
count: body.read_u8()?,
|
||||
},
|
||||
)),
|
||||
6 => Ok(PagestreamFeMessage::GetControlFileValues(
|
||||
PagestreamGetControlFileValuesRequest {
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
},
|
||||
)),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
@@ -1003,6 +1068,16 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(resp) => {
|
||||
bytes.put_u8(Tag::GetVectoredPages as u8);
|
||||
bytes.put_u8(resp.page_count);
|
||||
bytes.put(&resp.pages[..]);
|
||||
}
|
||||
|
||||
Self::GetControlFileValues(_resp) => {
|
||||
bytes.put_u8(Tag::GetControlFileValues as u8);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -1051,6 +1126,18 @@ impl PagestreamBeMessage {
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
Tag::GetVectoredPages => {
|
||||
let page_count = buf.read_u8()?;
|
||||
let mut pages = vec![0; page_count as usize * 8192];
|
||||
buf.read_exact(&mut pages)?;
|
||||
Self::GetVectoredPages(PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages.into(),
|
||||
})
|
||||
}
|
||||
Tag::GetControlFileValues => {
|
||||
Self::GetControlFileValues(PagestreamGetControlFileValuesResponse {})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
@@ -1070,6 +1157,8 @@ impl PagestreamBeMessage {
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
Self::GetVectoredPages(_) => "GetVectoredPages",
|
||||
Self::GetControlFileValues(_) => "GetControlFileValues",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,8 @@ use futures::SinkExt;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse,
|
||||
PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest,
|
||||
PagestreamGetVectoredPagesResponse, PagestreamGetControlFileValuesRequest, PagestreamGetControlFileValuesResponse,
|
||||
},
|
||||
reltag::RelTag,
|
||||
};
|
||||
@@ -157,7 +158,73 @@ impl PagestreamClient {
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetControlFileValues(_)
|
||||
| PagestreamBeMessage::GetVectoredPages(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn getpages(
|
||||
&mut self,
|
||||
req: PagestreamGetVectoredPagesRequest,
|
||||
) -> anyhow::Result<PagestreamGetVectoredPagesResponse> {
|
||||
let req = PagestreamFeMessage::GetVectoredPages(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
let msg = PagestreamBeMessage::deserialize(next)?;
|
||||
match msg {
|
||||
PagestreamBeMessage::GetVectoredPages(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetControlFileValues(_)
|
||||
| PagestreamBeMessage::GetPage(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_control_file(
|
||||
&mut self,
|
||||
req: PagestreamGetControlFileValuesRequest,
|
||||
) -> anyhow::Result<PagestreamGetControlFileValuesResponse> {
|
||||
let req = PagestreamFeMessage::GetControlFileValues(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
let msg = PagestreamBeMessage::deserialize(next)?;
|
||||
match msg {
|
||||
PagestreamBeMessage::GetControlFileValues(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetVectoredPages(_)
|
||||
| PagestreamBeMessage::GetPage(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
|
||||
@@ -2,7 +2,7 @@ use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::PagestreamGetPageRequest;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamGetControlFileValuesRequest};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantTimelineId;
|
||||
@@ -57,6 +57,8 @@ pub(crate) struct Args {
|
||||
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
|
||||
#[clap(long)]
|
||||
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
|
||||
#[clap(long)]
|
||||
lsn: Option<u64>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -300,21 +302,14 @@ async fn main_impl(
|
||||
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
PagestreamGetControlFileValuesRequest {
|
||||
lsn: Lsn(args.lsn.unwrap()),
|
||||
}
|
||||
};
|
||||
client.getpage(req).await.unwrap();
|
||||
|
||||
|
||||
client.get_control_file(req).await.unwrap();
|
||||
|
||||
let end = Instant::now();
|
||||
live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
|
||||
@@ -17,6 +17,13 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::key::AUX_FILES_KEY;
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::PagestreamGetControlFileValuesRequest;
|
||||
use pageserver_api::models::PagestreamGetControlFileValuesResponse;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
@@ -70,6 +77,7 @@ use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -333,6 +341,10 @@ enum PageStreamError {
|
||||
#[error("Read error")]
|
||||
Read(#[source] PageReconstructError),
|
||||
|
||||
/// Something went wrong reading a page: this likely indicates a pageserver bug
|
||||
#[error("Vectored read error")]
|
||||
VectoredRead(#[source] GetVectoredError),
|
||||
|
||||
/// Ran out of time waiting for an LSN
|
||||
#[error("LSN timeout: {0}")]
|
||||
LsnTimeout(WaitLsnError),
|
||||
@@ -356,6 +368,15 @@ impl From<PageReconstructError> for PageStreamError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for PageStreamError {
|
||||
fn from(value: GetVectoredError) -> Self {
|
||||
match value {
|
||||
GetVectoredError::Cancelled => Self::Shutdown,
|
||||
e => Self::VectoredRead(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for PageStreamError {
|
||||
fn from(value: GetActiveTimelineError) -> Self {
|
||||
match value {
|
||||
@@ -665,6 +686,24 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(req) => {
|
||||
let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count);
|
||||
(
|
||||
self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetControlFileValues(req) => {
|
||||
let span = tracing::info_span!("handle_get_control_values", req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_get_control_values(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
match response {
|
||||
@@ -1159,6 +1198,98 @@ impl PageServerHandler {
|
||||
page,
|
||||
}))
|
||||
}
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_control_values(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetControlFileValuesRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![CONTROLFILE_KEY..AUX_FILES_KEY.next()],
|
||||
};
|
||||
let _ = timeline.get_vectored(keyspace, req.lsn, ctx).await;
|
||||
|
||||
Ok(PagestreamBeMessage::GetControlFileValues(
|
||||
PagestreamGetControlFileValuesResponse {},
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_pages_at_lsn_request(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetVectoredPagesRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
// This is cheeky and relies on not using sharding :)
|
||||
// A real solution has to split the requested key sequence between shards.
|
||||
let get_page_request = PagestreamGetPageRequest {
|
||||
latest: req.latest,
|
||||
lsn: req.lsn,
|
||||
rel: req.rel,
|
||||
blkno: req.blkno,
|
||||
};
|
||||
|
||||
let timeline = match self.get_cached_timeline_for_page(&get_page_request) {
|
||||
Ok(tl) => tl,
|
||||
Err(key) => {
|
||||
match self
|
||||
.load_timeline_for_page(tenant_id, timeline_id, key)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
|
||||
set_tracing_field_shard_id(timeline);
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let (page_count, pages_buf) = timeline
|
||||
.get_rel_pages_at_lsn(
|
||||
req.rel,
|
||||
req.blkno,
|
||||
req.count,
|
||||
Version::Lsn(lsn),
|
||||
req.latest,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages_buf,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_slru_segment_request(
|
||||
|
||||
@@ -11,8 +11,9 @@ use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use anyhow::{anyhow, ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
@@ -26,7 +27,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::{Oid, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
@@ -198,6 +199,41 @@ impl Timeline {
|
||||
version.get(self, key, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_rel_pages_at_lsn(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
count: u8,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(GetVectoredError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
));
|
||||
}
|
||||
|
||||
let nblocks = self
|
||||
.get_rel_size(tag, version, latest, ctx)
|
||||
.await
|
||||
.map_err(|e| GetVectoredError::Other(anyhow!(e)))?;
|
||||
if blknum + (count - 1) as u32 >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
return Ok((1, ZERO_PAGE.clone()));
|
||||
}
|
||||
|
||||
let start_key = rel_block_to_key(tag, blknum);
|
||||
let end_key = start_key.add(count as u32);
|
||||
version.get_vectored(self, start_key..end_key, ctx).await
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub(crate) async fn get_db_size(
|
||||
&self,
|
||||
@@ -1604,6 +1640,55 @@ impl<'a> DatadirModification<'a> {
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
//
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let mut keys_in_modification = KeySpaceAccum::new();
|
||||
|
||||
let key = key_range.start;
|
||||
while key != key_range.end {
|
||||
if let Some(values) = self.pending_updates.get(&key) {
|
||||
if let Some((_, value)) = values.last() {
|
||||
keys_in_modification.add_key(key);
|
||||
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
results.insert(key, Ok(img.clone()));
|
||||
}
|
||||
_ => {
|
||||
results.insert(
|
||||
key,
|
||||
Err(PageReconstructError::from(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
))),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
|
||||
let mut keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace());
|
||||
|
||||
let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?;
|
||||
results.extend(pages.into_iter());
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
@@ -1647,6 +1732,43 @@ impl<'a> Version<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
let pages = match self {
|
||||
Version::Lsn(lsn) => {
|
||||
timeline
|
||||
.get_vectored(
|
||||
KeySpace {
|
||||
ranges: vec![key_range],
|
||||
},
|
||||
*lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Version::Modified(modification) => modification.get_vectored(key_range, ctx).await,
|
||||
}?;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let page_count: u8 = pages.len().try_into().expect("too many pages returned");
|
||||
for page in pages {
|
||||
match page {
|
||||
(_key, Ok(bytes)) => {
|
||||
buf.extend_from_slice(&bytes[..]);
|
||||
}
|
||||
(_key, Err(err)) => {
|
||||
return Err(GetVectoredError::Other(anyhow!(err)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((page_count, buf.freeze()))
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
|
||||
@@ -18,10 +18,19 @@
|
||||
//! - An Iterator interface would be more convenient for the callers than the
|
||||
//! 'visit' function
|
||||
//!
|
||||
use async_stream::try_stream;
|
||||
use byteorder::{ReadBytesExt, BE};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use either::Either;
|
||||
use std::{cmp::Ordering, io, result};
|
||||
use futures::Stream;
|
||||
use hex;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
io,
|
||||
iter::Rev,
|
||||
ops::{Range, RangeInclusive},
|
||||
result,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
|
||||
@@ -250,6 +259,90 @@ where
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Return a stream which yields all key, value pairs from the index
|
||||
/// starting from the first key greater or equal to `start_key`.
|
||||
///
|
||||
/// Note that this is a copy of [`Self::visit`].
|
||||
/// TODO: Once the sequential read path is removed this will become
|
||||
/// the only index traversal method.
|
||||
pub fn get_stream_from<'a>(
|
||||
&'a self,
|
||||
start_key: &'a [u8; L],
|
||||
ctx: &'a RequestContext,
|
||||
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
|
||||
try_stream! {
|
||||
let mut stack = Vec::new();
|
||||
stack.push((self.root_blk, None));
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Locate the node.
|
||||
let node_buf = block_cursor
|
||||
.read_blk(self.start_blk + node_blknum, ctx)
|
||||
.await?;
|
||||
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
|
||||
assert!(node.num_children > 0);
|
||||
|
||||
let mut keybuf = Vec::new();
|
||||
keybuf.extend(node.prefix);
|
||||
keybuf.resize(prefix_len + suffix_len, 0);
|
||||
|
||||
let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
|
||||
iter
|
||||
} else {
|
||||
// Locate the first match
|
||||
let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
|
||||
Ok(idx) => idx,
|
||||
Err(idx) => {
|
||||
if node.level == 0 {
|
||||
// Imagine that the node contains the following keys:
|
||||
//
|
||||
// 1
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
//
|
||||
// If the search key is '2' and there is exact match,
|
||||
// the binary search would return the index of key
|
||||
// '3'. That's cool, '3' is the first key to return.
|
||||
idx
|
||||
} else {
|
||||
// This is an internal page, so each key represents a lower
|
||||
// bound for what's in the child page. If there is no exact
|
||||
// match, we have to return the *previous* entry.
|
||||
//
|
||||
// 1 <-- return this
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
idx.saturating_sub(1)
|
||||
}
|
||||
}
|
||||
};
|
||||
Either::Left(idx..node.num_children.into())
|
||||
};
|
||||
|
||||
// idx points to the first match now. Keep going from there
|
||||
while let Some(idx) = iter.next() {
|
||||
let key_off = idx * suffix_len;
|
||||
let suffix = &node.keys[key_off..key_off + suffix_len];
|
||||
keybuf[prefix_len..].copy_from_slice(suffix);
|
||||
let value = node.value(idx);
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if node.level == 0 {
|
||||
// leaf
|
||||
yield (keybuf.clone(), value.to_u64());
|
||||
} else {
|
||||
stack.push((node_blknum, Some(iter)));
|
||||
stack.push((value.to_blknum(), None));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
|
||||
/// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
|
||||
|
||||
@@ -46,6 +46,8 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -847,10 +849,33 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
|
||||
let planner = VectoredReadPlanner::new(
|
||||
self.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into(),
|
||||
);
|
||||
|
||||
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
keyspace,
|
||||
lsn_range,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
@@ -858,73 +883,68 @@ impl DeltaLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads(
|
||||
&self,
|
||||
// This is public only for testing purposes.
|
||||
pub(crate) async fn plan_reads<Reader>(
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
data_end_offset: u64,
|
||||
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
|
||||
mut planner: VectoredReadPlanner,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<VectoredRead>> {
|
||||
let mut planner = VectoredReadPlanner::new(
|
||||
self.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into(),
|
||||
);
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
) -> anyhow::Result<Vec<VectoredRead>>
|
||||
where
|
||||
Reader: BlockReader,
|
||||
{
|
||||
let btree_request_context = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
tree_reader
|
||||
.visit(
|
||||
&start_key.0,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, value| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let index_stream = index_reader.get_stream_from(&start_key.0, &btree_request_context);
|
||||
pin_mut!(index_stream);
|
||||
|
||||
assert!(key >= range.start && lsn >= lsn_range.start);
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry.map_err(|err| anyhow!(err))?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
|
||||
let cached_lsn = reconstruct_state.get_cached_lsn(&key);
|
||||
let flag = {
|
||||
if cached_lsn >= Some(lsn) {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
// Lsns are not monotonically increasing, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let flag = {
|
||||
#[allow(clippy::if_same_then_else)]
|
||||
if lsn >= lsn_range.end || lsn < lsn_range.start {
|
||||
// If the Lsn is not in the queried range it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if reconstruct_state.get_cached_lsn(&key) >= Some(lsn) {
|
||||
// If the Lsn is below the caching line it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
// This blob will replace all previous blobs for this key
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
tracing::info!("Handling range end fallback at {}", payload_end);
|
||||
planner.handle_range_end(payload_end);
|
||||
tracing::info!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1190,3 +1210,131 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct BlobSpec {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
at: u64,
|
||||
}
|
||||
|
||||
fn validate(
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
vectored_reads: Vec<VectoredRead>,
|
||||
index_entries: BTreeMap<Key, Vec<Lsn>>,
|
||||
) {
|
||||
let mut planned_blobs = Vec::new();
|
||||
for read in vectored_reads {
|
||||
for (at, meta) in read.blobs_at.as_slice() {
|
||||
planned_blobs.push(BlobSpec {
|
||||
key: meta.key,
|
||||
lsn: meta.lsn,
|
||||
at: *at,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut expected_blobs = Vec::new();
|
||||
let mut disk_offset = 0;
|
||||
for (key, lsns) in index_entries {
|
||||
for lsn in lsns {
|
||||
let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
|
||||
let lsn_included = lsn_range.contains(&lsn);
|
||||
|
||||
if key_included && lsn_included {
|
||||
expected_blobs.push(BlobSpec {
|
||||
key,
|
||||
lsn,
|
||||
at: disk_offset,
|
||||
});
|
||||
}
|
||||
|
||||
disk_offset += 1;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(planned_blobs, expected_blobs);
|
||||
}
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
/// traverse in order to plan vectored reads for a query. Finally,
|
||||
/// verify that the traversal fed the right index key and value
|
||||
/// pairs into the planner.
|
||||
#[tokio::test]
|
||||
async fn test_delta_layer_index_traversal() {
|
||||
let base_key = Key {
|
||||
field1: 0,
|
||||
field2: 1663,
|
||||
field3: 12972,
|
||||
field4: 16396,
|
||||
field5: 0,
|
||||
field6: 246080,
|
||||
};
|
||||
|
||||
// Populate the index with some entries
|
||||
let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
|
||||
(base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
|
||||
(base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
|
||||
(base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
|
||||
(base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
|
||||
]);
|
||||
|
||||
let mut disk = TestDisk::default();
|
||||
let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
|
||||
|
||||
let mut disk_offset = 0;
|
||||
for (key, lsns) in &entries {
|
||||
for lsn in lsns {
|
||||
let index_key = DeltaKey::from_key_lsn(key, *lsn);
|
||||
let blob_ref = BlobRef::new(disk_offset, false);
|
||||
writer
|
||||
.append(&index_key.0, blob_ref.0)
|
||||
.expect("In memory disk append should never fail");
|
||||
|
||||
disk_offset += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare all the arguments for the call into `plan_reads` below
|
||||
let (root_offset, _writer) = writer
|
||||
.finish()
|
||||
.expect("In memory disk finish should never fail");
|
||||
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
|
||||
let planner = VectoredReadPlanner::new(100);
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![
|
||||
base_key..base_key.add(3),
|
||||
base_key.add(3)..base_key.add(100),
|
||||
],
|
||||
};
|
||||
let lsn_range = Lsn(2)..Lsn(40);
|
||||
|
||||
// Plan and validate
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
lsn_range.clone(),
|
||||
disk_offset,
|
||||
reader,
|
||||
planner,
|
||||
&mut reconstruct_state,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.expect("Read planning should not fail");
|
||||
|
||||
validate(keyspace, lsn_range, vectored_reads, entries);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,8 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::pin_mut;
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -54,6 +56,7 @@ use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -488,35 +491,33 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let btree_request_context = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut search_key);
|
||||
|
||||
tree_reader
|
||||
.visit(
|
||||
&search_key,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, offset| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
let index_stream = tree_reader.get_stream_from(&search_key, &btree_request_context);
|
||||
pin_mut!(index_stream);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, offset) = index_entry.map_err(|err| anyhow!(err))?;
|
||||
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
@@ -50,7 +50,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
|
||||
use crate::pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind};
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -75,6 +74,10 @@ use crate::{
|
||||
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::MaybeFatalIo,
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
@@ -3426,10 +3429,14 @@ impl Timeline {
|
||||
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
//
|
||||
// We use fatal_err() below because the after write_to_disk returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
par_fsync::par_fsync(&[self_clone
|
||||
.conf
|
||||
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
|
||||
.context("fsync of timeline dir")?;
|
||||
.fatal_err("fsync of timeline dir");
|
||||
|
||||
anyhow::Ok(new_delta)
|
||||
}
|
||||
@@ -3662,11 +3669,14 @@ impl Timeline {
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
if !image_layers.is_empty() {
|
||||
// We use fatal_err() below because the after writer.finish() returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
par_fsync::par_fsync_async(&[self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id)])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
.fatal_err("fsync of timeline dir");
|
||||
}
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
@@ -4251,12 +4261,16 @@ impl Timeline {
|
||||
// The writer.finish() above already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
//
|
||||
// We use fatal_err() below because the after writer.finish() returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
let timeline_dir = self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id);
|
||||
par_fsync::par_fsync_async(&[timeline_dir])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
.fatal_err("fsync of timeline dir");
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
|
||||
|
||||
@@ -74,6 +74,8 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
}
|
||||
prev = Some(req);
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(_) => {}
|
||||
PagestreamFeMessage::GetControlFileValues(_) => {}
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user