mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 11:40:37 +00:00
Compare commits
1 Commits
exp-07-18
...
kelvich-pa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f5a4a1eba5 |
@@ -1,13 +0,0 @@
|
||||
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
|
||||
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
|
||||
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
|
||||
#
|
||||
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
|
||||
#
|
||||
[profile.dev.package."*"]
|
||||
# Set the default for dependencies in Development mode.
|
||||
opt-level = 3
|
||||
|
||||
[profile.dev]
|
||||
# Turn on a small amount of optimization in Development mode.
|
||||
opt-level = 1
|
||||
15
.github/actions/run-python-test-set/action.yml
vendored
15
.github/actions/run-python-test-set/action.yml
vendored
@@ -37,12 +37,6 @@ runs:
|
||||
name: neon-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-artifact
|
||||
path: ./neon-artifact/
|
||||
|
||||
- name: Get Postgres artifact for restoration
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: postgres-${{ runner.os }}-${{ inputs.build_type }}-artifact
|
||||
path: ./pg-artifact/
|
||||
|
||||
- name: Extract Neon artifact
|
||||
shell: bash -ex {0}
|
||||
run: |
|
||||
@@ -50,13 +44,6 @@ runs:
|
||||
tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/
|
||||
rm -rf ./neon-artifact/
|
||||
|
||||
- name: Extract Postgres artifact
|
||||
shell: bash -ex {0}
|
||||
run: |
|
||||
mkdir -p /tmp/neon/tmp_install
|
||||
tar -xf ./pg-artifact/pg.tgz -C /tmp/neon/tmp_install
|
||||
rm -rf ./pg-artifact/
|
||||
|
||||
- name: Checkout
|
||||
if: inputs.needs_postgres_source == 'true'
|
||||
uses: actions/checkout@v3
|
||||
@@ -78,7 +65,7 @@ runs:
|
||||
- name: Run pytest
|
||||
env:
|
||||
NEON_BIN: /tmp/neon/bin
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/tmp_install
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
# this variable will be embedded in perf test report
|
||||
# and is needed to distinguish different environments
|
||||
|
||||
9
.github/workflows/benchmarking.yml
vendored
9
.github/workflows/benchmarking.yml
vendored
@@ -104,12 +104,3 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
run: |
|
||||
REPORT_FROM=$(realpath perf-report-staging) REPORT_TO=staging scripts/generate_and_push_perf_report.sh
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
50
.github/workflows/build_and_test.yml
vendored
50
.github/workflows/build_and_test.yml
vendored
@@ -1,10 +1,9 @@
|
||||
name: Test and Deploy
|
||||
name: Test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
- release
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
@@ -12,9 +11,8 @@ defaults:
|
||||
shell: bash -ex {0}
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -95,17 +93,12 @@ jobs:
|
||||
tar -xf ./postgres-artifact/pg.tgz -C ./tmp_install/
|
||||
rm -rf ./postgres-artifact/
|
||||
|
||||
# Don't include the ~/.cargo/registry/src directory. It contains just
|
||||
# uncompressed versions of the crates in ~/.cargo/registry/cache
|
||||
# directory, and it's faster to let 'cargo' to rebuild it from the
|
||||
# compressed crates.
|
||||
- name: Cache cargo deps
|
||||
id: cache_cargo
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry/
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git/
|
||||
target/
|
||||
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
|
||||
@@ -177,14 +170,14 @@ jobs:
|
||||
for bin in $test_exe_paths; do
|
||||
SRC=$bin
|
||||
DST=/tmp/neon/test_bin/$(basename $bin)
|
||||
|
||||
# We don't need debug symbols for code coverage, so strip them out to make
|
||||
# the artifact smaller.
|
||||
strip "$SRC" -o "$DST"
|
||||
cp "$SRC" "$DST"
|
||||
echo "$DST" >> /tmp/coverage/binaries.list
|
||||
done
|
||||
fi
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a tmp_install /tmp/neon/pg_install
|
||||
|
||||
- name: Prepare neon artifact
|
||||
run: tar -C /tmp/neon/ -czf ./neon.tgz .
|
||||
|
||||
@@ -305,7 +298,6 @@ jobs:
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry/
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git/
|
||||
target/
|
||||
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
|
||||
@@ -449,14 +441,14 @@ jobs:
|
||||
fi
|
||||
id: legacy-build-tag
|
||||
|
||||
- name: Build neon Docker image
|
||||
- name: Build compute-tools Docker image
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION="${{github.sha}}"
|
||||
AWS_ACCESS_KEY_ID="${{secrets.CACHEPOT_AWS_ACCESS_KEY_ID}}"
|
||||
AWS_SECRET_ACCESS_KEY="${{secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY}}"
|
||||
GIT_VERSION="${GITHUB_SHA}"
|
||||
AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
|
||||
AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
|
||||
pull: true
|
||||
push: true
|
||||
tags: neondatabase/neon:${{steps.legacy-build-tag.outputs.tag}}, neondatabase/neon:${{steps.build-tag.outputs.tag}}
|
||||
@@ -516,9 +508,8 @@ jobs:
|
||||
with:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION="${{github.sha}}"
|
||||
AWS_ACCESS_KEY_ID="${{secrets.CACHEPOT_AWS_ACCESS_KEY_ID}}"
|
||||
AWS_SECRET_ACCESS_KEY="${{secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY}}"
|
||||
AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
|
||||
AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
|
||||
push: false
|
||||
file: Dockerfile.compute-tools
|
||||
tags: neondatabase/compute-tools:local
|
||||
@@ -528,9 +519,8 @@ jobs:
|
||||
with:
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION="${{github.sha}}"
|
||||
AWS_ACCESS_KEY_ID="${{secrets.CACHEPOT_AWS_ACCESS_KEY_ID}}"
|
||||
AWS_SECRET_ACCESS_KEY="${{secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY}}"
|
||||
AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
|
||||
AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
|
||||
push: true
|
||||
file: Dockerfile.compute-tools
|
||||
tags: neondatabase/compute-tools:${{steps.legacy-build-tag.outputs.tag}}
|
||||
@@ -568,11 +558,7 @@ jobs:
|
||||
|
||||
deploy:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
# We need both storage **and** compute images for deploy, because control plane
|
||||
# picks the compute version based on the storage version. If it notices a fresh
|
||||
# storage it may bump the compute version. And if compute image failed to build
|
||||
# it may break things badly.
|
||||
needs: [ docker-image, docker-image-compute, calculate-deploy-targets ]
|
||||
needs: [ docker-image, calculate-deploy-targets ]
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
@@ -615,9 +601,7 @@ jobs:
|
||||
|
||||
deploy-proxy:
|
||||
runs-on: [ self-hosted, Linux, k8s-runner ]
|
||||
# Compute image isn't strictly required for proxy deploy, but let's still wait for it
|
||||
# to run all deploy jobs consistently.
|
||||
needs: [ docker-image, docker-image-compute, calculate-deploy-targets ]
|
||||
needs: [ docker-image, calculate-deploy-targets ]
|
||||
if: |
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
|
||||
6
.github/workflows/codestyle.yml
vendored
6
.github/workflows/codestyle.yml
vendored
@@ -11,9 +11,8 @@ defaults:
|
||||
shell: bash -ex {0}
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -98,7 +97,6 @@ jobs:
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: ${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }}
|
||||
|
||||
5
.github/workflows/pg_clients.yml
vendored
5
.github/workflows/pg_clients.yml
vendored
@@ -13,9 +13,8 @@ on:
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
test-postgres-client-libs:
|
||||
|
||||
774
Cargo.lock
generated
774
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -62,13 +62,6 @@ brew install protobuf etcd openssl
|
||||
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
|
||||
```
|
||||
|
||||
3. Install PostgreSQL Client
|
||||
```
|
||||
# from https://stackoverflow.com/questions/44654216/correct-way-to-install-psql-without-full-postgres-on-macos
|
||||
brew install libpq
|
||||
brew link --force libpq
|
||||
```
|
||||
|
||||
#### Building on Linux and OSX
|
||||
|
||||
1. Build neon and patched postgres
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::{info, log_enabled, warn, Level};
|
||||
use postgres::error::SqlState;
|
||||
use postgres::{Client, NoTls};
|
||||
use serde::Deserialize;
|
||||
|
||||
@@ -394,34 +395,20 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
|
||||
// This will only change ownership on the schema itself, not the objects
|
||||
// inside it. Without it owner of the `public` schema will be `cloud_admin`
|
||||
// and database owner cannot do anything with it. SQL procedure ensures
|
||||
// that it won't error out if schema `public` doesn't exist.
|
||||
let alter_query = format!(
|
||||
"DO $$\n\
|
||||
DECLARE\n\
|
||||
schema_owner TEXT;\n\
|
||||
BEGIN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT nspname\n\
|
||||
FROM pg_catalog.pg_namespace\n\
|
||||
WHERE nspname = 'public'\n\
|
||||
)\n\
|
||||
THEN\n\
|
||||
SELECT nspowner::regrole::text\n\
|
||||
FROM pg_catalog.pg_namespace\n\
|
||||
WHERE nspname = 'public'\n\
|
||||
INTO schema_owner;\n\
|
||||
\n\
|
||||
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
|
||||
THEN\n\
|
||||
ALTER SCHEMA public OWNER TO {};\n\
|
||||
END IF;\n\
|
||||
END IF;\n\
|
||||
END\n\
|
||||
$$;",
|
||||
db.owner.quote()
|
||||
);
|
||||
db_client.simple_query(&alter_query)?;
|
||||
// and database owner cannot do anything with it.
|
||||
let alter_query = format!("ALTER SCHEMA public OWNER TO {}", db.owner.quote());
|
||||
let res = db_client.simple_query(&alter_query);
|
||||
|
||||
if let Err(e) = res {
|
||||
if e.code() == Some(&SqlState::INVALID_SCHEMA_NAME) {
|
||||
// This is OK, db just don't have a `public` schema.
|
||||
// Probably user dropped it manually.
|
||||
info!("no 'public' schema found in the database {}", db.name);
|
||||
} else {
|
||||
// Something different happened, propagate the error
|
||||
return Err(anyhow!(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
1
docs/.gitignore
vendored
1
docs/.gitignore
vendored
@@ -1 +0,0 @@
|
||||
book
|
||||
14
docs/README.md
Normal file
14
docs/README.md
Normal file
@@ -0,0 +1,14 @@
|
||||
# Zenith documentation
|
||||
|
||||
## Table of contents
|
||||
|
||||
- [authentication.md](authentication.md) — pageserver JWT authentication.
|
||||
- [docker.md](docker.md) — Docker images and building pipeline.
|
||||
- [glossary.md](glossary.md) — Glossary of all the terms used in codebase.
|
||||
- [multitenancy.md](multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
|
||||
- [sourcetree.md](sourcetree.md) — Overview of the source tree layout.
|
||||
- [pageserver/README.md](/pageserver/README.md) — pageserver overview.
|
||||
- [postgres_ffi/README.md](/libs/postgres_ffi/README.md) — Postgres FFI overview.
|
||||
- [test_runner/README.md](/test_runner/README.md) — tests infrastructure overview.
|
||||
- [safekeeper/README.md](/safekeeper/README.md) — WAL service overview.
|
||||
- [core_changes.md](core_changes.md) - Description of Zenith changes in Postgres core
|
||||
@@ -1,84 +0,0 @@
|
||||
# Summary
|
||||
|
||||
[Introduction]()
|
||||
- [Separation of Compute and Storage](./separation-compute-storage.md)
|
||||
|
||||
# Architecture
|
||||
|
||||
- [Compute]()
|
||||
- [WAL proposer]()
|
||||
- [WAL Backpressure]()
|
||||
- [Postgres changes](./core_changes.md)
|
||||
|
||||
- [Pageserver](./pageserver.md)
|
||||
- [Services](./pageserver-services.md)
|
||||
- [Thread management](./pageserver-thread-mgmt.md)
|
||||
- [WAL Redo](./pageserver-walredo.md)
|
||||
- [Page cache](./pageserver-pagecache.md)
|
||||
- [Storage](./pageserver-storage.md)
|
||||
- [Datadir mapping]()
|
||||
- [Layer files]()
|
||||
- [Branching]()
|
||||
- [Garbage collection]()
|
||||
- [Cloud Storage]()
|
||||
- [Processing a GetPage request](./pageserver-processing-getpage.md)
|
||||
- [Processing WAL](./pageserver-processing-wal.md)
|
||||
- [Management API]()
|
||||
- [Tenant Rebalancing]()
|
||||
|
||||
- [WAL Service](walservice.md)
|
||||
- [Consensus protocol](safekeeper-protocol.md)
|
||||
- [Management API]()
|
||||
- [Rebalancing]()
|
||||
|
||||
- [Control Plane]()
|
||||
|
||||
- [Proxy]()
|
||||
|
||||
- [Source view](./sourcetree.md)
|
||||
- [docker.md](./docker.md) — Docker images and building pipeline.
|
||||
- [Error handling and logging]()
|
||||
- [Testing]()
|
||||
- [Unit testing]()
|
||||
- [Integration testing]()
|
||||
- [Benchmarks]()
|
||||
|
||||
|
||||
- [Glossary](./glossary.md)
|
||||
|
||||
# Uncategorized
|
||||
|
||||
- [authentication.md](./authentication.md)
|
||||
- [multitenancy.md](./multitenancy.md) — how multitenancy is organized in the pageserver and Zenith CLI.
|
||||
- [settings.md](./settings.md)
|
||||
#FIXME: move these under sourcetree.md
|
||||
#- [pageserver/README.md](/pageserver/README.md)
|
||||
#- [postgres_ffi/README.md](/libs/postgres_ffi/README.md)
|
||||
#- [test_runner/README.md](/test_runner/README.md)
|
||||
#- [safekeeper/README.md](/safekeeper/README.md)
|
||||
|
||||
|
||||
# RFCs
|
||||
|
||||
- [RFCs](./rfcs/README.md)
|
||||
|
||||
- [002-storage](rfcs/002-storage.md)
|
||||
- [003-laptop-cli](rfcs/003-laptop-cli.md)
|
||||
- [004-durability](rfcs/004-durability.md)
|
||||
- [005-zenith_local](rfcs/005-zenith_local.md)
|
||||
- [006-laptop-cli-v2-CLI](rfcs/006-laptop-cli-v2-CLI.md)
|
||||
- [006-laptop-cli-v2-repository-structure](rfcs/006-laptop-cli-v2-repository-structure.md)
|
||||
- [007-serverless-on-laptop](rfcs/007-serverless-on-laptop.md)
|
||||
- [008-push-pull](rfcs/008-push-pull.md)
|
||||
- [009-snapshot-first-storage-cli](rfcs/009-snapshot-first-storage-cli.md)
|
||||
- [009-snapshot-first-storage](rfcs/009-snapshot-first-storage.md)
|
||||
- [009-snapshot-first-storage-pitr](rfcs/009-snapshot-first-storage-pitr.md)
|
||||
- [010-storage_details](rfcs/010-storage_details.md)
|
||||
- [011-retention-policy](rfcs/011-retention-policy.md)
|
||||
- [012-background-tasks](rfcs/012-background-tasks.md)
|
||||
- [013-term-history](rfcs/013-term-history.md)
|
||||
- [014-safekeepers-gossip](rfcs/014-safekeepers-gossip.md)
|
||||
- [014-storage-lsm](rfcs/014-storage-lsm.md)
|
||||
- [015-storage-messaging](rfcs/015-storage-messaging.md)
|
||||
- [016-connection-routing](rfcs/016-connection-routing.md)
|
||||
- [cluster-size-limits](rfcs/cluster-size-limits.md)
|
||||
@@ -1,5 +0,0 @@
|
||||
[book]
|
||||
language = "en"
|
||||
multilingual = false
|
||||
src = "."
|
||||
title = "Neon architecture"
|
||||
@@ -1,12 +1,3 @@
|
||||
# Postgres core changes
|
||||
|
||||
This lists all the changes that have been made to the PostgreSQL
|
||||
source tree, as a somewhat logical set of patches. The long-term goal
|
||||
is to eliminate all these changes, by submitting patches to upstream
|
||||
and refactoring code into extensions, so that you can run unmodified
|
||||
PostgreSQL against Neon storage.
|
||||
|
||||
|
||||
1. Add t_cid to XLOG record
|
||||
- Why?
|
||||
The cmin/cmax on a heap page is a real bummer. I don't see any other way to fix that than bite the bullet and modify the WAL-logging routine to include the cmin/cmax.
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
# Page Service
|
||||
|
||||
The Page Service listens for GetPage@LSN requests from the Compute Nodes,
|
||||
and responds with pages from the repository. On each GetPage@LSN request,
|
||||
it calls into the Repository function
|
||||
|
||||
A separate thread is spawned for each incoming connection to the page
|
||||
service. The page service uses the libpq protocol to communicate with
|
||||
the client. The client is a Compute Postgres instance.
|
||||
@@ -1,8 +0,0 @@
|
||||
# Page cache
|
||||
|
||||
TODO:
|
||||
|
||||
- shared across tenants
|
||||
- store pages from layer files
|
||||
- store pages from "in-memory layer"
|
||||
- store materialized pages
|
||||
@@ -1,4 +0,0 @@
|
||||
# Processing a GetPage request
|
||||
|
||||
TODO:
|
||||
- sequence diagram that shows how a GetPage@LSN request is processed
|
||||
@@ -1,5 +0,0 @@
|
||||
# Processing WAL
|
||||
|
||||
TODO:
|
||||
- diagram that shows how incoming WAL is processed
|
||||
- explain durability, what is fsync'd when, disk_consistent_lsn
|
||||
@@ -1,26 +0,0 @@
|
||||
## Thread management
|
||||
|
||||
Each thread in the system is tracked by the `thread_mgr` module. It
|
||||
maintains a registry of threads, and which tenant or timeline they are
|
||||
operating on. This is used for safe shutdown of a tenant, or the whole
|
||||
system.
|
||||
|
||||
### Handling shutdown
|
||||
|
||||
When a tenant or timeline is deleted, we need to shut down all threads
|
||||
operating on it, before deleting the data on disk. A thread registered
|
||||
in the thread registry can check if it has been requested to shut down,
|
||||
by calling `is_shutdown_requested()`. For async operations, there's also
|
||||
a `shudown_watcher()` async task that can be used to wake up on shutdown.
|
||||
|
||||
### Sync vs async
|
||||
|
||||
The primary programming model in the page server is synchronous,
|
||||
blocking code. However, there are some places where async code is
|
||||
used. Be very careful when mixing sync and async code.
|
||||
|
||||
Async is primarily used to wait for incoming data on network
|
||||
connections. For example, all WAL receivers have a shared thread pool,
|
||||
with one async Task for each connection. Once a piece of WAL has been
|
||||
received from the network, the thread calls the blocking functions in
|
||||
the Repository to process the WAL.
|
||||
@@ -1,77 +0,0 @@
|
||||
# WAL Redo
|
||||
|
||||
To reconstruct a particular page version from an image of the page and
|
||||
some WAL records, the pageserver needs to replay the WAL records. This
|
||||
happens on-demand, when a GetPage@LSN request comes in, or as part of
|
||||
background jobs that reorganize data for faster access.
|
||||
|
||||
It's important that data cannot leak from one tenant to another, and
|
||||
that a corrupt WAL record on one timeline doesn't affect other tenants
|
||||
or timelines.
|
||||
|
||||
## Multi-tenant security
|
||||
|
||||
If you have direct access to the WAL directory, or if you have
|
||||
superuser access to a running PostgreSQL server, it's easy to
|
||||
construct a malicious or corrupt WAL record that causes the WAL redo
|
||||
functions to crash, or to execute arbitrary code. That is not a
|
||||
security problem for PostgreSQL; if you have superuser access, you
|
||||
have full access to the system anyway.
|
||||
|
||||
The Neon pageserver, however, is multi-tenant. It needs to execute WAL
|
||||
belonging to different tenants in the same system, and malicious WAL
|
||||
in one tenant must not affect other tenants.
|
||||
|
||||
A separate WAL redo process is launched for each tenant, and the
|
||||
process uses the seccomp(2) system call to restrict its access to the
|
||||
bare minimum needed to replay WAL records. The process does not have
|
||||
access to the filesystem or network. It can only communicate with the
|
||||
parent pageserver process through a pipe.
|
||||
|
||||
If an attacker creates a malicious WAL record and injects it into the
|
||||
WAL stream of a timeline, he can take control of the WAL redo process
|
||||
in the pageserver. However, the WAL redo process cannot access the
|
||||
rest of the system. And because there is a separate WAL redo process
|
||||
for each tenant, the hijacked WAL redo process can only see WAL and
|
||||
data belonging to the same tenant, which the attacker would have
|
||||
access to anyway.
|
||||
|
||||
## WAL-redo process communication
|
||||
|
||||
The WAL redo process runs the 'postgres' executable, launched with a
|
||||
Neon-specific command-line option to put it into WAL-redo process
|
||||
mode. The pageserver controls the lifetime of the WAL redo processes,
|
||||
launching them as needed. If a tenant is detached from the pageserver,
|
||||
any WAL redo processes for that tenant are killed.
|
||||
|
||||
The pageserver communicates with each WAL redo process over its
|
||||
stdin/stdout/stderr. It works in request-response model with a simple
|
||||
custom protocol, described in walredo.rs. To replay a set of WAL
|
||||
records for a page, the pageserver sends the "before" image of the
|
||||
page and the WAL records over 'stdin', followed by a command to
|
||||
perform the replay. The WAL redo process responds with an "after"
|
||||
image of the page.
|
||||
|
||||
## Special handling of some records
|
||||
|
||||
Some WAL record types are handled directly in the pageserver, by
|
||||
bespoken Rust code, and are not sent over to the WAL redo process.
|
||||
This includes SLRU-related WAL records, like commit records. SLRUs
|
||||
don't use the standard Postgres buffer manager, so dealing with them
|
||||
in the Neon WAL redo mode would require quite a few changes to
|
||||
Postgres code and special handling in the protocol anyway.
|
||||
|
||||
Some record types that include a full-page-image (e.g. XLOG_FPI) are
|
||||
also handled specially when incoming WAL is processed already, and are
|
||||
stored as page images rather than WAL records.
|
||||
|
||||
|
||||
## Records that modify multiple pages
|
||||
|
||||
Some Postgres WAL records modify multiple pages. Such WAL records are
|
||||
duplicated, so that a copy is stored for each affected page. This is
|
||||
somewhat wasteful, but because most WAL records only affect one page,
|
||||
the overhead is acceptable.
|
||||
|
||||
The WAL redo always happens for one particular page. If the WAL record
|
||||
coantains changes to other pages, they are ignored.
|
||||
@@ -1,11 +0,0 @@
|
||||
# Page server architecture
|
||||
|
||||
The Page Server has a few different duties:
|
||||
|
||||
- Respond to GetPage@LSN requests from the Compute Nodes
|
||||
- Receive WAL from WAL safekeeper, and store it
|
||||
- Upload data to S3 to make it durable, download files from S3 as needed
|
||||
|
||||
S3 is the main fault-tolerant storage of all data, as there are no Page Server
|
||||
replicas. We use a separate fault-tolerant WAL service to reduce latency. It
|
||||
keeps track of WAL records which are not synced to S3 yet.
|
||||
@@ -1,8 +0,0 @@
|
||||
# Separation of Compute and Storage
|
||||
|
||||
TODO:
|
||||
|
||||
- Read path
|
||||
- Write path
|
||||
- Durability model
|
||||
- API auth
|
||||
@@ -15,7 +15,6 @@ use crate::XLogPageHeaderData;
|
||||
use crate::XLogRecord;
|
||||
use crate::XLOG_PAGE_MAGIC;
|
||||
|
||||
use crate::pg_constants::WAL_SEGMENT_SIZE;
|
||||
use anyhow::{bail, ensure};
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::BytesMut;
|
||||
@@ -462,7 +461,8 @@ pub fn find_end_of_wal(
|
||||
pub fn main() {
|
||||
let mut data_dir = PathBuf::new();
|
||||
data_dir.push(".");
|
||||
let (wal_end, tli) = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, true, Lsn(0)).unwrap();
|
||||
let wal_seg_size = 16 * 1024 * 1024;
|
||||
let (wal_end, tli) = find_end_of_wal(&data_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
||||
println!(
|
||||
"wal_end={:>08X}{:>08X}, tli={}",
|
||||
(wal_end >> 32) as u32,
|
||||
@@ -606,9 +606,10 @@ mod tests {
|
||||
fn test_end_of_wal<C: wal_craft::Crafter>(
|
||||
test_name: &str,
|
||||
expected_end_of_wal_non_partial: Lsn,
|
||||
last_segment: &str,
|
||||
) {
|
||||
use wal_craft::*;
|
||||
// Craft some WAL
|
||||
// 1. Generate some WAL
|
||||
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("..")
|
||||
.join("..");
|
||||
@@ -621,71 +622,24 @@ mod tests {
|
||||
}
|
||||
cfg.initdb().unwrap();
|
||||
let srv = cfg.start_server().unwrap();
|
||||
let (intermediate_lsns, expected_end_of_wal_partial) =
|
||||
C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
|
||||
let intermediate_lsns: Vec<Lsn> = intermediate_lsns
|
||||
.iter()
|
||||
.map(|&lsn| u64::from(lsn).into())
|
||||
.collect();
|
||||
let expected_end_of_wal_partial: Lsn = u64::from(expected_end_of_wal_partial).into();
|
||||
let expected_wal_end: Lsn =
|
||||
u64::from(C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap()).into();
|
||||
srv.kill();
|
||||
|
||||
// Check find_end_of_wal on the initial WAL
|
||||
let last_segment = cfg
|
||||
.wal_dir()
|
||||
.read_dir()
|
||||
.unwrap()
|
||||
.map(|f| f.unwrap().file_name().into_string().unwrap())
|
||||
.filter(|fname| IsXLogFileName(fname))
|
||||
.max()
|
||||
.unwrap();
|
||||
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal_partial);
|
||||
for start_lsn in std::iter::once(Lsn(0))
|
||||
.chain(intermediate_lsns)
|
||||
.chain(std::iter::once(expected_end_of_wal_partial))
|
||||
{
|
||||
// Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
|
||||
// We assume that `start_lsn` is non-decreasing.
|
||||
info!(
|
||||
"Checking with start_lsn={}, erasing WAL before it",
|
||||
start_lsn
|
||||
);
|
||||
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
|
||||
let fname = file.file_name().into_string().unwrap();
|
||||
if !IsXLogFileName(&fname) {
|
||||
continue;
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
|
||||
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
|
||||
if seg_start_lsn > u64::from(start_lsn) {
|
||||
continue;
|
||||
}
|
||||
let mut f = File::options().write(true).open(file.path()).unwrap();
|
||||
const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
|
||||
f.write_all(
|
||||
&ZEROS[0..min(
|
||||
WAL_SEGMENT_SIZE,
|
||||
(u64::from(start_lsn) - seg_start_lsn) as usize,
|
||||
)],
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
check_end_of_wal(
|
||||
&cfg,
|
||||
&last_segment,
|
||||
start_lsn,
|
||||
expected_end_of_wal_non_partial,
|
||||
expected_end_of_wal_partial,
|
||||
);
|
||||
}
|
||||
}
|
||||
// 2. Pick WAL generated by initdb
|
||||
let wal_dir = cfg.datadir.join("pg_wal");
|
||||
let wal_seg_size = 16 * 1024 * 1024;
|
||||
|
||||
fn check_pg_waldump_end_of_wal(
|
||||
cfg: &wal_craft::Conf,
|
||||
last_segment: &str,
|
||||
expected_end_of_wal: Lsn,
|
||||
) {
|
||||
// Get the actual end of WAL by pg_waldump
|
||||
// 3. Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
|
||||
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
||||
let wal_end = Lsn(wal_end);
|
||||
info!(
|
||||
"find_end_of_wal returned (wal_end={}, tli={})",
|
||||
wal_end, tli
|
||||
);
|
||||
assert_eq!(wal_end, expected_end_of_wal_non_partial);
|
||||
|
||||
// 4. Get the actual end of WAL by pg_waldump
|
||||
let waldump_output = cfg
|
||||
.pg_waldump("000000010000000000000001", last_segment)
|
||||
.unwrap()
|
||||
@@ -704,57 +658,32 @@ mod tests {
|
||||
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
|
||||
info!(
|
||||
"waldump erred on {}, expected wal end at {}",
|
||||
waldump_wal_end, expected_end_of_wal
|
||||
waldump_wal_end, expected_wal_end
|
||||
);
|
||||
assert_eq!(waldump_wal_end, expected_end_of_wal);
|
||||
}
|
||||
assert_eq!(waldump_wal_end, expected_wal_end);
|
||||
|
||||
fn check_end_of_wal(
|
||||
cfg: &wal_craft::Conf,
|
||||
last_segment: &str,
|
||||
start_lsn: Lsn,
|
||||
expected_end_of_wal_non_partial: Lsn,
|
||||
expected_end_of_wal_partial: Lsn,
|
||||
) {
|
||||
// Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
|
||||
let (wal_end, tli) =
|
||||
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
|
||||
let wal_end = Lsn(wal_end);
|
||||
info!(
|
||||
"find_end_of_wal returned (wal_end={}, tli={}) with non-partial WAL segment",
|
||||
wal_end, tli
|
||||
);
|
||||
assert_eq!(wal_end, expected_end_of_wal_non_partial);
|
||||
|
||||
// Rename file to partial to actually find last valid lsn, then rename it back.
|
||||
// 5. Rename file to partial to actually find last valid lsn
|
||||
fs::rename(
|
||||
cfg.wal_dir().join(&last_segment),
|
||||
cfg.wal_dir().join(format!("{}.partial", last_segment)),
|
||||
wal_dir.join(last_segment),
|
||||
wal_dir.join(format!("{}.partial", last_segment)),
|
||||
)
|
||||
.unwrap();
|
||||
let (wal_end, tli) =
|
||||
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
|
||||
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
||||
let wal_end = Lsn(wal_end);
|
||||
info!(
|
||||
"find_end_of_wal returned (wal_end={}, tli={}) with partial WAL segment",
|
||||
"find_end_of_wal returned (wal_end={}, tli={})",
|
||||
wal_end, tli
|
||||
);
|
||||
assert_eq!(wal_end, expected_end_of_wal_partial);
|
||||
fs::rename(
|
||||
cfg.wal_dir().join(format!("{}.partial", last_segment)),
|
||||
cfg.wal_dir().join(last_segment),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(wal_end, waldump_wal_end);
|
||||
}
|
||||
|
||||
const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
|
||||
|
||||
#[test]
|
||||
pub fn test_find_end_of_wal_simple() {
|
||||
init_logging();
|
||||
test_end_of_wal::<wal_craft::Simple>(
|
||||
"test_find_end_of_wal_simple",
|
||||
"0/2000000".parse::<Lsn>().unwrap(),
|
||||
"000000010000000000000001",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -764,6 +693,7 @@ mod tests {
|
||||
test_end_of_wal::<wal_craft::WalRecordCrossingSegmentFollowedBySmallOne>(
|
||||
"test_find_end_of_wal_crossing_segment_followed_by_small_one",
|
||||
"0/3000000".parse::<Lsn>().unwrap(),
|
||||
"000000010000000000000002",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -774,6 +704,7 @@ mod tests {
|
||||
test_end_of_wal::<wal_craft::LastWalRecordCrossingSegment>(
|
||||
"test_find_end_of_wal_last_crossing_segment",
|
||||
"0/3000000".parse::<Lsn>().unwrap(),
|
||||
"000000010000000000000002",
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ fn main() -> Result<()> {
|
||||
.get_matches();
|
||||
|
||||
let wal_craft = |arg_matches: &ArgMatches, client| {
|
||||
let (intermediate_lsns, end_of_wal_lsn) = match arg_matches.value_of("type").unwrap() {
|
||||
let lsn = match arg_matches.value_of("type").unwrap() {
|
||||
Simple::NAME => Simple::craft(client)?,
|
||||
LastWalRecordXlogSwitch::NAME => LastWalRecordXlogSwitch::craft(client)?,
|
||||
LastWalRecordXlogSwitchEndsOnPageBoundary::NAME => {
|
||||
@@ -67,10 +67,7 @@ fn main() -> Result<()> {
|
||||
LastWalRecordCrossingSegment::NAME => LastWalRecordCrossingSegment::craft(client)?,
|
||||
a => panic!("Unknown --type argument: {}", a),
|
||||
};
|
||||
for lsn in intermediate_lsns {
|
||||
println!("intermediate_lsn = {}", lsn);
|
||||
}
|
||||
println!("end_of_wal = {}", end_of_wal_lsn);
|
||||
println!("end_of_wal = {}", lsn);
|
||||
Ok(())
|
||||
};
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ use log::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres::types::PgLsn;
|
||||
use postgres::Client;
|
||||
use postgres_ffi::pg_constants::WAL_SEGMENT_SIZE;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
};
|
||||
@@ -46,10 +45,6 @@ impl Conf {
|
||||
self.pg_distrib_dir.join("lib")
|
||||
}
|
||||
|
||||
pub fn wal_dir(&self) -> PathBuf {
|
||||
self.datadir.join("pg_wal")
|
||||
}
|
||||
|
||||
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
|
||||
let path = self.pg_bin_dir().join(command);
|
||||
ensure!(path.exists(), "Command {:?} does not exist", path);
|
||||
@@ -216,7 +211,7 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result
|
||||
"Unexpected wal_segment_size unit"
|
||||
);
|
||||
ensure!(
|
||||
wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
|
||||
wal_segment_size.get::<_, i64>("setting") == 16 * 1024 * 1024,
|
||||
"Unexpected wal_segment_size in bytes"
|
||||
);
|
||||
|
||||
@@ -226,24 +221,20 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result
|
||||
pub trait Crafter {
|
||||
const NAME: &'static str;
|
||||
|
||||
/// Generates WAL using the client `client`. Returns a pair of:
|
||||
/// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
|
||||
/// May include or exclude Lsn(0) and the end-of-wal.
|
||||
/// * The expected end-of-wal LSN.
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)>;
|
||||
/// Generates WAL using the client `client`. Returns the expected end-of-wal LSN.
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn>;
|
||||
}
|
||||
|
||||
fn craft_internal<C: postgres::GenericClient>(
|
||||
client: &mut C,
|
||||
f: impl Fn(&mut C, PgLsn) -> Result<(Vec<PgLsn>, Option<PgLsn>)>,
|
||||
) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
f: impl Fn(&mut C, PgLsn) -> Result<Option<PgLsn>>,
|
||||
) -> Result<PgLsn> {
|
||||
ensure_server_config(client)?;
|
||||
|
||||
let initial_lsn = client.pg_current_wal_insert_lsn()?;
|
||||
info!("LSN initial = {}", initial_lsn);
|
||||
|
||||
let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
|
||||
let last_lsn = match last_lsn {
|
||||
let last_lsn = match f(client, initial_lsn)? {
|
||||
None => client.pg_current_wal_insert_lsn()?,
|
||||
Some(last_lsn) => match last_lsn.cmp(&client.pg_current_wal_insert_lsn()?) {
|
||||
Ordering::Less => bail!("Some records were inserted after the crafted WAL"),
|
||||
@@ -251,9 +242,6 @@ fn craft_internal<C: postgres::GenericClient>(
|
||||
Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
|
||||
},
|
||||
};
|
||||
if !intermediate_lsns.starts_with(&[initial_lsn]) {
|
||||
intermediate_lsns.insert(0, initial_lsn);
|
||||
}
|
||||
|
||||
// Some records may be not flushed, e.g. non-transactional logical messages.
|
||||
client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
|
||||
@@ -262,16 +250,16 @@ fn craft_internal<C: postgres::GenericClient>(
|
||||
Ordering::Equal => {}
|
||||
Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
|
||||
}
|
||||
Ok((intermediate_lsns, last_lsn))
|
||||
Ok(last_lsn)
|
||||
}
|
||||
|
||||
pub struct Simple;
|
||||
impl Crafter for Simple {
|
||||
const NAME: &'static str = "simple";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
craft_internal(client, |client, _| {
|
||||
client.execute("CREATE table t(x int)", &[])?;
|
||||
Ok((Vec::new(), None))
|
||||
Ok(None)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -279,13 +267,12 @@ impl Crafter for Simple {
|
||||
pub struct LastWalRecordXlogSwitch;
|
||||
impl Crafter for LastWalRecordXlogSwitch {
|
||||
const NAME: &'static str = "last_wal_record_xlog_switch";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
||||
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
||||
ensure_server_config(client)?;
|
||||
|
||||
client.execute("CREATE table t(x int)", &[])?;
|
||||
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
||||
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let next_segment = PgLsn::from(0x0200_0000);
|
||||
ensure!(
|
||||
@@ -294,14 +281,14 @@ impl Crafter for LastWalRecordXlogSwitch {
|
||||
after_xlog_switch,
|
||||
next_segment
|
||||
);
|
||||
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
|
||||
Ok(next_segment)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
|
||||
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
||||
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
||||
ensure_server_config(client)?;
|
||||
@@ -347,7 +334,6 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
);
|
||||
|
||||
// Emit the XLOG_SWITCH
|
||||
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
||||
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let next_segment = PgLsn::from(0x0200_0000);
|
||||
ensure!(
|
||||
@@ -361,14 +347,14 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
"XLOG_SWITCH message ended not on page boundary: {}",
|
||||
after_xlog_switch
|
||||
);
|
||||
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
|
||||
Ok(next_segment)
|
||||
}
|
||||
}
|
||||
|
||||
fn craft_single_logical_message(
|
||||
client: &mut impl postgres::GenericClient,
|
||||
transactional: bool,
|
||||
) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
) -> Result<PgLsn> {
|
||||
craft_internal(client, |client, initial_lsn| {
|
||||
ensure!(
|
||||
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
|
||||
@@ -400,9 +386,9 @@ fn craft_single_logical_message(
|
||||
message_lsn < after_message_lsn,
|
||||
"No record found after the emitted message"
|
||||
);
|
||||
Ok((vec![message_lsn], Some(after_message_lsn)))
|
||||
Ok(Some(after_message_lsn))
|
||||
} else {
|
||||
Ok((Vec::new(), Some(message_lsn)))
|
||||
Ok(Some(message_lsn))
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -410,7 +396,7 @@ fn craft_single_logical_message(
|
||||
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
|
||||
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
|
||||
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
craft_single_logical_message(client, true)
|
||||
}
|
||||
}
|
||||
@@ -418,7 +404,7 @@ impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
|
||||
pub struct LastWalRecordCrossingSegment;
|
||||
impl Crafter for LastWalRecordCrossingSegment {
|
||||
const NAME: &'static str = "last_wal_record_crossing_segment";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
craft_single_logical_message(client, false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,15 @@
|
||||
# Services
|
||||
## Page server architecture
|
||||
|
||||
The Page Server has a few different duties:
|
||||
|
||||
- Respond to GetPage@LSN requests from the Compute Nodes
|
||||
- Receive WAL from WAL safekeeper
|
||||
- Replay WAL that's applicable to the chunks that the Page Server maintains
|
||||
- Backup to S3
|
||||
|
||||
S3 is the main fault-tolerant storage of all data, as there are no Page Server
|
||||
replicas. We use a separate fault-tolerant WAL service to reduce latency. It
|
||||
keeps track of WAL records which are not synced to S3 yet.
|
||||
|
||||
The Page Server consists of multiple threads that operate on a shared
|
||||
repository of page versions:
|
||||
@@ -10,22 +21,18 @@ repository of page versions:
|
||||
| WAL receiver |
|
||||
| |
|
||||
+--------------+
|
||||
......
|
||||
+---------+ +--------+ . .
|
||||
| | | | . .
|
||||
GetPage@LSN | | | backup | -------> . S3 .
|
||||
-------------> | Page | repository | | . .
|
||||
| Service | +--------+ . .
|
||||
page | | ......
|
||||
+----+
|
||||
+---------+ .......... | |
|
||||
| | . . | |
|
||||
GetPage@LSN | | . backup . -------> | S3 |
|
||||
-------------> | Page | repository . . | |
|
||||
| Service | .......... | |
|
||||
page | | +----+
|
||||
<------------- | |
|
||||
+---------+ +-----------+ +--------------------+
|
||||
| WAL redo | | Checkpointing, |
|
||||
+----------+ | processes | | Garbage collection |
|
||||
| | +-----------+ +--------------------+
|
||||
| HTTP |
|
||||
| mgmt API |
|
||||
| |
|
||||
+----------+
|
||||
+---------+ +--------------------+
|
||||
| Checkpointing / |
|
||||
| Garbage collection |
|
||||
+--------------------+
|
||||
|
||||
Legend:
|
||||
|
||||
@@ -33,77 +40,28 @@ Legend:
|
||||
| | A thread or multi-threaded service
|
||||
+--+
|
||||
|
||||
....
|
||||
. . Component at its early development phase.
|
||||
....
|
||||
|
||||
---> Data flow
|
||||
<---
|
||||
```
|
||||
|
||||
## Page Service
|
||||
Page Service
|
||||
------------
|
||||
|
||||
The Page Service listens for GetPage@LSN requests from the Compute Nodes,
|
||||
and responds with pages from the repository. On each GetPage@LSN request,
|
||||
it calls into the Repository function
|
||||
|
||||
A separate thread is spawned for each incoming connection to the page
|
||||
service. The page service uses the libpq protocol to communicate with
|
||||
the client. The client is a Compute Postgres instance.
|
||||
|
||||
## WAL Receiver
|
||||
|
||||
The WAL receiver connects to the external WAL safekeeping service
|
||||
using PostgreSQL physical streaming replication, and continuously
|
||||
receives WAL. It decodes the WAL records, and stores them to the
|
||||
repository.
|
||||
and responds with pages from the repository.
|
||||
|
||||
|
||||
## Backup service
|
||||
WAL Receiver
|
||||
------------
|
||||
|
||||
The backup service, responsible for storing pageserver recovery data externally.
|
||||
|
||||
Currently, pageserver stores its files in a filesystem directory it's pointed to.
|
||||
That working directory could be rather ephemeral for such cases as "a pageserver pod running in k8s with no persistent volumes attached".
|
||||
Therefore, the server interacts with external, more reliable storage to back up and restore its state.
|
||||
|
||||
The code for storage support is extensible and can support arbitrary ones as long as they implement a certain Rust trait.
|
||||
There are the following implementations present:
|
||||
* local filesystem — to use in tests mainly
|
||||
* AWS S3 - to use in production
|
||||
|
||||
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
|
||||
|
||||
The backup service is disabled by default and can be enabled to interact with a single remote storage.
|
||||
|
||||
CLI examples:
|
||||
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
|
||||
|
||||
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
|
||||
For local S3 installations, refer to the their documentation for name format and credentials.
|
||||
|
||||
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
|
||||
Required sections are:
|
||||
|
||||
```toml
|
||||
[remote_storage]
|
||||
local_path = '/Users/someonetoignore/Downloads/tmp_dir/'
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```toml
|
||||
[remote_storage]
|
||||
bucket_name = 'some-sample-bucket'
|
||||
bucket_region = 'eu-north-1'
|
||||
prefix_in_bucket = '/test_prefix/'
|
||||
```
|
||||
|
||||
`AWS_SECRET_ACCESS_KEY` and `AWS_ACCESS_KEY_ID` env variables can be used to specify the S3 credentials if needed.
|
||||
|
||||
|
||||
## Repository background tasks
|
||||
|
||||
The Repository also has a few different background threads and tokio tasks that perform
|
||||
background duties like dumping accumulated WAL data from memory to disk, reorganizing
|
||||
files for performance (compaction), and garbage collecting old files.
|
||||
The WAL receiver connects to the external WAL safekeeping service (or
|
||||
directly to the primary) using PostgreSQL physical streaming
|
||||
replication, and continuously receives WAL. It decodes the WAL records,
|
||||
and stores them to the repository.
|
||||
|
||||
|
||||
Repository
|
||||
@@ -158,6 +116,48 @@ Remove old on-disk layer files that are no longer needed according to the
|
||||
PITR retention policy
|
||||
|
||||
|
||||
### Backup service
|
||||
|
||||
The backup service, responsible for storing pageserver recovery data externally.
|
||||
|
||||
Currently, pageserver stores its files in a filesystem directory it's pointed to.
|
||||
That working directory could be rather ephemeral for such cases as "a pageserver pod running in k8s with no persistent volumes attached".
|
||||
Therefore, the server interacts with external, more reliable storage to back up and restore its state.
|
||||
|
||||
The code for storage support is extensible and can support arbitrary ones as long as they implement a certain Rust trait.
|
||||
There are the following implementations present:
|
||||
* local filesystem — to use in tests mainly
|
||||
* AWS S3 - to use in production
|
||||
|
||||
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
|
||||
|
||||
The backup service is disabled by default and can be enabled to interact with a single remote storage.
|
||||
|
||||
CLI examples:
|
||||
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"`
|
||||
|
||||
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
|
||||
For local S3 installations, refer to the their documentation for name format and credentials.
|
||||
|
||||
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
|
||||
Required sections are:
|
||||
|
||||
```toml
|
||||
[remote_storage]
|
||||
local_path = '/Users/someonetoignore/Downloads/tmp_dir/'
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```toml
|
||||
[remote_storage]
|
||||
bucket_name = 'some-sample-bucket'
|
||||
bucket_region = 'eu-north-1'
|
||||
prefix_in_bucket = '/test_prefix/'
|
||||
```
|
||||
|
||||
`AWS_SECRET_ACCESS_KEY` and `AWS_ACCESS_KEY_ID` env variables can be used to specify the S3 credentials if needed.
|
||||
|
||||
TODO: Sharding
|
||||
--------------------
|
||||
@@ -1768,23 +1768,24 @@ impl LayeredTimeline {
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
|
||||
let layer_paths_to_upload;
|
||||
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
// repository have the same LSN.
|
||||
let lsn_range = frozen_layer.get_lsn_range();
|
||||
let layer_paths_to_upload = if lsn_range.start == self.initdb_lsn
|
||||
&& lsn_range.end == Lsn(self.initdb_lsn.0 + 1)
|
||||
{
|
||||
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
|
||||
let pgdir = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)?;
|
||||
let (partitioning, _lsn) =
|
||||
pgdir.repartition(self.initdb_lsn, self.get_compaction_target_size())?;
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true)?
|
||||
layer_paths_to_upload =
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true)?;
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let delta_path = self.create_delta_layer(&frozen_layer)?;
|
||||
HashSet::from([delta_path])
|
||||
};
|
||||
layer_paths_to_upload = HashSet::from([delta_path]);
|
||||
}
|
||||
|
||||
fail_point!("flush-frozen-before-sync");
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Pageserver storage
|
||||
# Overview
|
||||
|
||||
The main responsibility of the Page Server is to process the incoming WAL, and
|
||||
reprocess it into a format that allows reasonably quick access to any page
|
||||
@@ -928,7 +928,7 @@ fn storage_sync_loop<P, S>(
|
||||
);
|
||||
let mut sync_status_updates: HashMap<ZTenantId, HashSet<ZTimelineId>> =
|
||||
HashMap::new();
|
||||
let index_accessor = runtime.block_on(index.read());
|
||||
let index_accessor = runtime.block_on(index.write());
|
||||
for tenant_id in updated_tenants {
|
||||
let tenant_entry = match index_accessor.tenant_entry(&tenant_id) {
|
||||
Some(tenant_entry) => tenant_entry,
|
||||
@@ -1557,7 +1557,6 @@ fn schedule_first_sync_tasks(
|
||||
local_timeline_init_statuses
|
||||
}
|
||||
|
||||
/// bool in return value stands for awaits_download
|
||||
fn compare_local_and_remote_timeline(
|
||||
new_sync_tasks: &mut VecDeque<(ZTenantTimelineId, SyncTask)>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
@@ -1567,6 +1566,14 @@ fn compare_local_and_remote_timeline(
|
||||
) -> (LocalTimelineInitStatus, bool) {
|
||||
let remote_files = remote_entry.stored_files();
|
||||
|
||||
// TODO probably here we need more sophisticated logic,
|
||||
// if more data is available remotely can we just download what's there?
|
||||
// without trying to upload something. It may be tricky, needs further investigation.
|
||||
// For now looks strange that we can request upload
|
||||
// and download for the same timeline simultaneously.
|
||||
// (upload needs to be only for previously unsynced files, not whole timeline dir).
|
||||
// If one of the tasks fails they will be reordered in the queue which can lead
|
||||
// to timeline being stuck in evicted state
|
||||
let number_of_layers_to_download = remote_files.difference(&local_files).count();
|
||||
let (initial_timeline_status, awaits_download) = if number_of_layers_to_download > 0 {
|
||||
new_sync_tasks.push_back((
|
||||
|
||||
@@ -3,13 +3,12 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fmt::Debug,
|
||||
mem,
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use remote_storage::{path_with_suffix_extension, DownloadError, RemoteObjectName, RemoteStorage};
|
||||
use remote_storage::{path_with_suffix_extension, RemoteObjectName, RemoteStorage};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{self, AsyncWriteExt},
|
||||
@@ -28,50 +27,28 @@ use super::{
|
||||
|
||||
pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
|
||||
|
||||
// We collect timelines remotely available for each tenant
|
||||
// in case we failed to gather all index parts (due to an error)
|
||||
// Poisoned variant is returned.
|
||||
// When data is received succesfully without errors Present variant is used.
|
||||
pub enum TenantIndexParts {
|
||||
Poisoned {
|
||||
present: HashMap<ZTimelineId, IndexPart>,
|
||||
missing: HashSet<ZTimelineId>,
|
||||
},
|
||||
Present(HashMap<ZTimelineId, IndexPart>),
|
||||
}
|
||||
|
||||
impl TenantIndexParts {
|
||||
fn add_poisoned(&mut self, timeline_id: ZTimelineId) {
|
||||
match self {
|
||||
TenantIndexParts::Poisoned { missing, .. } => {
|
||||
missing.insert(timeline_id);
|
||||
}
|
||||
TenantIndexParts::Present(present) => {
|
||||
*self = TenantIndexParts::Poisoned {
|
||||
present: mem::take(present),
|
||||
missing: HashSet::from([timeline_id]),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TenantIndexParts {
|
||||
fn default() -> Self {
|
||||
TenantIndexParts::Present(HashMap::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// FIXME: Needs cleanup. Currently it swallows errors. Here we need to ensure that
|
||||
/// we successfully downloaded all metadata parts for one tenant.
|
||||
/// And successful includes absence of index_part in the remote. Because it is valid situation
|
||||
/// when timeline was just created and pageserver restarted before upload of index part was completed.
|
||||
/// But currently RemoteStorage interface does not provide this knowledge because it uses
|
||||
/// anyhow::Error as an error type. So this needs a refactoring.
|
||||
///
|
||||
/// In other words we need to yield only complete sets of tenant timelines.
|
||||
/// Failure for one timeline of a tenant should exclude whole tenant from returned hashmap.
|
||||
/// So there are two requirements: keep everything in one futures unordered
|
||||
/// to allow higher concurrency. Mark tenants as failed independently.
|
||||
/// That requires some bookeeping.
|
||||
pub async fn download_index_parts<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
keys: HashSet<ZTenantTimelineId>,
|
||||
) -> HashMap<ZTenantId, TenantIndexParts>
|
||||
) -> HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let mut index_parts: HashMap<ZTenantId, TenantIndexParts> = HashMap::new();
|
||||
let mut index_parts: HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>> = HashMap::new();
|
||||
|
||||
let mut part_downloads = keys
|
||||
.into_iter()
|
||||
@@ -82,29 +59,12 @@ where
|
||||
match part_upload_result {
|
||||
Ok(index_part) => {
|
||||
debug!("Successfully fetched index part for {id}");
|
||||
match index_parts.entry(id.tenant_id).or_default() {
|
||||
TenantIndexParts::Poisoned { present, .. } => {
|
||||
present.insert(id.timeline_id, index_part);
|
||||
}
|
||||
TenantIndexParts::Present(parts) => {
|
||||
parts.insert(id.timeline_id, index_part);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(download_error) => {
|
||||
match download_error {
|
||||
DownloadError::NotFound => {
|
||||
// thats ok because it means that we didnt upload something we have locally for example
|
||||
}
|
||||
e => {
|
||||
let tenant_parts = index_parts.entry(id.tenant_id).or_default();
|
||||
tenant_parts.add_poisoned(id.timeline_id);
|
||||
error!(
|
||||
"Failed to fetch index part for {id}: {e} poisoning tenant index parts"
|
||||
);
|
||||
}
|
||||
}
|
||||
index_parts
|
||||
.entry(id.tenant_id)
|
||||
.or_default()
|
||||
.insert(id.timeline_id, index_part);
|
||||
}
|
||||
Err(e) => error!("Failed to fetch index part for {id}: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,16 +119,12 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
match download_index_parts(conf, storage, sync_ids)
|
||||
download_index_parts(conf, storage, sync_ids)
|
||||
.await
|
||||
.remove(&tenant_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing tenant index parts. This is a bug."))?
|
||||
{
|
||||
TenantIndexParts::Poisoned { missing, .. } => {
|
||||
anyhow::bail!("Failed to download index parts for all timelines. Missing {missing:?}")
|
||||
}
|
||||
TenantIndexParts::Present(parts) => Ok(parts),
|
||||
}
|
||||
.ok_or(anyhow::anyhow!(
|
||||
"Missing tenant index parts. This is a bug."
|
||||
))
|
||||
}
|
||||
|
||||
/// Retrieves index data from the remote storage for a given timeline.
|
||||
@@ -176,7 +132,7 @@ async fn download_index_part<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
sync_id: ZTenantTimelineId,
|
||||
) -> Result<IndexPart, DownloadError>
|
||||
) -> anyhow::Result<IndexPart>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
@@ -191,11 +147,15 @@ where
|
||||
"Failed to get the index part storage path for local path '{}'",
|
||||
index_part_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
|
||||
let mut index_part_download = storage.download(&part_storage_path).await?;
|
||||
})?;
|
||||
|
||||
let mut index_part_download =
|
||||
storage
|
||||
.download(&part_storage_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to open download stream for for storage path {part_storage_path:?}")
|
||||
})?;
|
||||
let mut index_part_bytes = Vec::new();
|
||||
io::copy(
|
||||
&mut index_part_download.download_stream,
|
||||
@@ -204,16 +164,11 @@ where
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
})?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize index part file from storage path '{part_storage_path:?}'"
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
|
||||
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
|
||||
})?;
|
||||
|
||||
let missing_files = index_part.missing_files();
|
||||
if !missing_files.is_empty() {
|
||||
|
||||
@@ -13,7 +13,6 @@ use anyhow::{anyhow, Context, Ok};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::log::warn;
|
||||
|
||||
use crate::{config::PageServerConf, layered_repository::metadata::TimelineMetadata};
|
||||
use utils::{
|
||||
@@ -21,8 +20,6 @@ use utils::{
|
||||
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
|
||||
};
|
||||
|
||||
use super::download::TenantIndexParts;
|
||||
|
||||
/// A part of the filesystem path, that needs a root to become a path again.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
@@ -91,27 +88,21 @@ pub struct RemoteIndex(Arc<RwLock<RemoteTimelineIndex>>);
|
||||
impl RemoteIndex {
|
||||
pub fn from_parts(
|
||||
conf: &'static PageServerConf,
|
||||
index_parts: HashMap<ZTenantId, TenantIndexParts>,
|
||||
index_parts: HashMap<ZTenantId, HashMap<ZTimelineId, IndexPart>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let mut entries: HashMap<ZTenantId, TenantEntry> = HashMap::new();
|
||||
|
||||
for (tenant_id, index_parts) in index_parts {
|
||||
match index_parts {
|
||||
// TODO: should we schedule a retry so it can be recovered? otherwise we can revive it only through detach/attach or pageserver restart
|
||||
TenantIndexParts::Poisoned { missing, ..} => warn!("skipping tenant_id set up for remote index because the index download has failed for timeline(s): {missing:?}"),
|
||||
TenantIndexParts::Present(timelines) => {
|
||||
for (timeline_id, index_part) in timelines {
|
||||
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
let remote_timeline =
|
||||
RemoteTimeline::from_index_part(&timeline_path, index_part)
|
||||
.context("Failed to restore remote timeline data from index part")?;
|
||||
for (tenant_id, timelines) in index_parts {
|
||||
for (timeline_id, index_part) in timelines {
|
||||
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
let remote_timeline =
|
||||
RemoteTimeline::from_index_part(&timeline_path, index_part)
|
||||
.context("Failed to restore remote timeline data from index part")?;
|
||||
|
||||
entries
|
||||
.entry(tenant_id)
|
||||
.or_default()
|
||||
.insert(timeline_id, remote_timeline);
|
||||
}
|
||||
},
|
||||
entries
|
||||
.entry(tenant_id)
|
||||
.or_default()
|
||||
.insert(timeline_id, remote_timeline);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
52
poetry.lock
generated
52
poetry.lock
generated
@@ -544,21 +544,20 @@ test = ["pytest (>=6.2.0)", "pytest-cov", "pytest-subtests", "pytest-xdist", "pr
|
||||
|
||||
[[package]]
|
||||
name = "docker"
|
||||
version = "4.2.2"
|
||||
version = "5.0.3"
|
||||
description = "A Python library for the Docker Engine API."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
python-versions = ">=3.6"
|
||||
|
||||
[package.dependencies]
|
||||
pypiwin32 = {version = "223", markers = "sys_platform == \"win32\" and python_version >= \"3.6\""}
|
||||
pywin32 = {version = "227", markers = "sys_platform == \"win32\""}
|
||||
requests = ">=2.14.2,<2.18.0 || >2.18.0"
|
||||
six = ">=1.4.0"
|
||||
websocket-client = ">=0.32.0"
|
||||
|
||||
[package.extras]
|
||||
ssh = ["paramiko (>=2.4.2)"]
|
||||
tls = ["pyOpenSSL (>=17.5.0)", "cryptography (>=1.3.4)", "idna (>=2.0.0)"]
|
||||
tls = ["pyOpenSSL (>=17.5.0)", "cryptography (>=3.4.7)", "idna (>=2.0.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "ecdsa"
|
||||
@@ -1004,17 +1003,6 @@ python-versions = ">=3.6"
|
||||
[package.extras]
|
||||
diagrams = ["jinja2", "railroad-diagrams"]
|
||||
|
||||
[[package]]
|
||||
name = "pypiwin32"
|
||||
version = "223"
|
||||
description = ""
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[package.dependencies]
|
||||
pywin32 = ">=223"
|
||||
|
||||
[[package]]
|
||||
name = "pyrsistent"
|
||||
version = "0.18.1"
|
||||
@@ -1136,7 +1124,7 @@ python-versions = "*"
|
||||
|
||||
[[package]]
|
||||
name = "pywin32"
|
||||
version = "301"
|
||||
version = "227"
|
||||
description = "Python for Window Extensions"
|
||||
category = "main"
|
||||
optional = false
|
||||
@@ -1513,8 +1501,8 @@ cryptography = [
|
||||
{file = "cryptography-36.0.1.tar.gz", hash = "sha256:53e5c1dc3d7a953de055d77bef2ff607ceef7a2aac0353b5d630ab67f7423638"},
|
||||
]
|
||||
docker = [
|
||||
{file = "docker-4.2.2-py2.py3-none-any.whl", hash = "sha256:03a46400c4080cb6f7aa997f881ddd84fef855499ece219d75fbdb53289c17ab"},
|
||||
{file = "docker-4.2.2.tar.gz", hash = "sha256:26eebadce7e298f55b76a88c4f8802476c5eaddbdbe38dbc6cce8781c47c9b54"},
|
||||
{file = "docker-5.0.3-py2.py3-none-any.whl", hash = "sha256:7a79bb439e3df59d0a72621775d600bc8bc8b422d285824cb37103eab91d1ce0"},
|
||||
{file = "docker-5.0.3.tar.gz", hash = "sha256:d916a26b62970e7c2f554110ed6af04c7ccff8e9f81ad17d0d40c75637e227fb"},
|
||||
]
|
||||
ecdsa = [
|
||||
{file = "ecdsa-0.17.0-py2.py3-none-any.whl", hash = "sha256:5cf31d5b33743abe0dfc28999036c849a69d548f994b535e527ee3cb7f3ef676"},
|
||||
@@ -1814,10 +1802,6 @@ pyparsing = [
|
||||
{file = "pyparsing-3.0.6-py3-none-any.whl", hash = "sha256:04ff808a5b90911829c55c4e26f75fa5ca8a2f5f36aa3a51f68e27033341d3e4"},
|
||||
{file = "pyparsing-3.0.6.tar.gz", hash = "sha256:d9bdec0013ef1eb5a84ab39a3b3868911598afa494f5faa038647101504e2b81"},
|
||||
]
|
||||
pypiwin32 = [
|
||||
{file = "pypiwin32-223-py3-none-any.whl", hash = "sha256:67adf399debc1d5d14dffc1ab5acacb800da569754fafdc576b2a039485aa775"},
|
||||
{file = "pypiwin32-223.tar.gz", hash = "sha256:71be40c1fbd28594214ecaecb58e7aa8b708eabfa0125c8a109ebd51edbd776a"},
|
||||
]
|
||||
pyrsistent = [
|
||||
{file = "pyrsistent-0.18.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:df46c854f490f81210870e509818b729db4488e1f30f2a1ce1698b2295a878d1"},
|
||||
{file = "pyrsistent-0.18.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5d45866ececf4a5fff8742c25722da6d4c9e180daa7b405dc0a2a2790d668c26"},
|
||||
@@ -1874,16 +1858,18 @@ pytz = [
|
||||
{file = "pytz-2021.3.tar.gz", hash = "sha256:acad2d8b20a1af07d4e4c9d2e9285c5ed9104354062f275f3fcd88dcef4f1326"},
|
||||
]
|
||||
pywin32 = [
|
||||
{file = "pywin32-301-cp35-cp35m-win32.whl", hash = "sha256:93367c96e3a76dfe5003d8291ae16454ca7d84bb24d721e0b74a07610b7be4a7"},
|
||||
{file = "pywin32-301-cp35-cp35m-win_amd64.whl", hash = "sha256:9635df6998a70282bd36e7ac2a5cef9ead1627b0a63b17c731312c7a0daebb72"},
|
||||
{file = "pywin32-301-cp36-cp36m-win32.whl", hash = "sha256:c866f04a182a8cb9b7855de065113bbd2e40524f570db73ef1ee99ff0a5cc2f0"},
|
||||
{file = "pywin32-301-cp36-cp36m-win_amd64.whl", hash = "sha256:dafa18e95bf2a92f298fe9c582b0e205aca45c55f989937c52c454ce65b93c78"},
|
||||
{file = "pywin32-301-cp37-cp37m-win32.whl", hash = "sha256:98f62a3f60aa64894a290fb7494bfa0bfa0a199e9e052e1ac293b2ad3cd2818b"},
|
||||
{file = "pywin32-301-cp37-cp37m-win_amd64.whl", hash = "sha256:fb3b4933e0382ba49305cc6cd3fb18525df7fd96aa434de19ce0878133bf8e4a"},
|
||||
{file = "pywin32-301-cp38-cp38-win32.whl", hash = "sha256:88981dd3cfb07432625b180f49bf4e179fb8cbb5704cd512e38dd63636af7a17"},
|
||||
{file = "pywin32-301-cp38-cp38-win_amd64.whl", hash = "sha256:8c9d33968aa7fcddf44e47750e18f3d034c3e443a707688a008a2e52bbef7e96"},
|
||||
{file = "pywin32-301-cp39-cp39-win32.whl", hash = "sha256:595d397df65f1b2e0beaca63a883ae6d8b6df1cdea85c16ae85f6d2e648133fe"},
|
||||
{file = "pywin32-301-cp39-cp39-win_amd64.whl", hash = "sha256:87604a4087434cd814ad8973bd47d6524bd1fa9e971ce428e76b62a5e0860fdf"},
|
||||
{file = "pywin32-227-cp27-cp27m-win32.whl", hash = "sha256:371fcc39416d736401f0274dd64c2302728c9e034808e37381b5e1b22be4a6b0"},
|
||||
{file = "pywin32-227-cp27-cp27m-win_amd64.whl", hash = "sha256:4cdad3e84191194ea6d0dd1b1b9bdda574ff563177d2adf2b4efec2a244fa116"},
|
||||
{file = "pywin32-227-cp35-cp35m-win32.whl", hash = "sha256:f4c5be1a293bae0076d93c88f37ee8da68136744588bc5e2be2f299a34ceb7aa"},
|
||||
{file = "pywin32-227-cp35-cp35m-win_amd64.whl", hash = "sha256:a929a4af626e530383a579431b70e512e736e9588106715215bf685a3ea508d4"},
|
||||
{file = "pywin32-227-cp36-cp36m-win32.whl", hash = "sha256:300a2db938e98c3e7e2093e4491439e62287d0d493fe07cce110db070b54c0be"},
|
||||
{file = "pywin32-227-cp36-cp36m-win_amd64.whl", hash = "sha256:9b31e009564fb95db160f154e2aa195ed66bcc4c058ed72850d047141b36f3a2"},
|
||||
{file = "pywin32-227-cp37-cp37m-win32.whl", hash = "sha256:47a3c7551376a865dd8d095a98deba954a98f326c6fe3c72d8726ca6e6b15507"},
|
||||
{file = "pywin32-227-cp37-cp37m-win_amd64.whl", hash = "sha256:31f88a89139cb2adc40f8f0e65ee56a8c585f629974f9e07622ba80199057511"},
|
||||
{file = "pywin32-227-cp38-cp38-win32.whl", hash = "sha256:7f18199fbf29ca99dff10e1f09451582ae9e372a892ff03a28528a24d55875bc"},
|
||||
{file = "pywin32-227-cp38-cp38-win_amd64.whl", hash = "sha256:7c1ae32c489dc012930787f06244426f8356e129184a02c25aef163917ce158e"},
|
||||
{file = "pywin32-227-cp39-cp39-win32.whl", hash = "sha256:c054c52ba46e7eb6b7d7dfae4dbd987a1bb48ee86debe3f245a2884ece46e295"},
|
||||
{file = "pywin32-227-cp39-cp39-win_amd64.whl", hash = "sha256:f27cec5e7f588c3d1051651830ecc00294f90728d19c3bf6916e6dba93ea357c"},
|
||||
]
|
||||
pyyaml = [
|
||||
{file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"},
|
||||
|
||||
@@ -83,9 +83,7 @@ impl ElectionLeader {
|
||||
) -> Result<bool> {
|
||||
let resp = self.client.leader(election_name).await?;
|
||||
|
||||
let kv = resp
|
||||
.kv()
|
||||
.ok_or_else(|| anyhow!("failed to get leader response"))?;
|
||||
let kv = resp.kv().ok_or(anyhow!("failed to get leader response"))?;
|
||||
let leader = kv.value_str()?;
|
||||
|
||||
Ok(leader == candidate_name)
|
||||
|
||||
@@ -637,17 +637,6 @@ where
|
||||
&mut self,
|
||||
msg: &VoteRequest,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
// Once voted, we won't accept data from older proposers; flush
|
||||
// everything we've already received so that new proposer starts
|
||||
// streaming at end of our WAL, without overlap. Currently we truncate
|
||||
// WAL at streaming point, so this avoids truncating already committed
|
||||
// WAL.
|
||||
//
|
||||
// TODO: it would be smoother to not truncate committed piece at
|
||||
// handle_elected instead. Currently not a big deal, as proposer is the
|
||||
// only source of WAL; with peer2peer recovery it would be more
|
||||
// important.
|
||||
self.wal_store.flush_wal()?;
|
||||
// initialize with refusal
|
||||
let mut resp = VoteResponse {
|
||||
term: self.state.acceptor_state.term,
|
||||
|
||||
@@ -44,7 +44,7 @@ def test_branching_with_pgbench(neon_simple_env: NeonEnv,
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
pg_bin.run_capture(['pgbench', '-i', f'-s{scale}', connstr])
|
||||
pg_bin.run_capture(['pgbench', '-T15', connstr])
|
||||
pg_bin.run_capture(['pgbench', '-c10', '-T15', connstr])
|
||||
|
||||
env.neon_cli.create_branch('b0', tenant_id=tenant)
|
||||
pgs: List[Postgres] = []
|
||||
@@ -54,23 +54,12 @@ def test_branching_with_pgbench(neon_simple_env: NeonEnv,
|
||||
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0], ), daemon=True))
|
||||
threads[-1].start()
|
||||
|
||||
thread_limit = 4
|
||||
|
||||
for i in range(n_branches):
|
||||
# random a delay between [0, 5]
|
||||
delay = random.random() * 5
|
||||
time.sleep(delay)
|
||||
log.info(f"Sleep {delay}s")
|
||||
|
||||
# If the number of concurrent threads exceeds a threshold,
|
||||
# wait for all the threads to finish before spawning a new one.
|
||||
# Because tests defined in `batch_others` are run concurrently in CI,
|
||||
# we want to avoid the situation that one test exhausts resources for other tests.
|
||||
if len(threads) >= thread_limit:
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
threads = []
|
||||
|
||||
if ty == "cascade":
|
||||
env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(i), tenant_id=tenant)
|
||||
else:
|
||||
|
||||
@@ -302,8 +302,6 @@ def test_compute_restarts(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
class BackgroundCompute(object):
|
||||
MAX_QUERY_GAP_SECONDS = 2
|
||||
|
||||
def __init__(self, index: int, env: NeonEnv, branch: str):
|
||||
self.index = index
|
||||
self.env = env
|
||||
@@ -341,7 +339,7 @@ class BackgroundCompute(object):
|
||||
|
||||
# With less sleep, there is a very big chance of not committing
|
||||
# anything or only 1 xact during test run.
|
||||
await asyncio.sleep(random.uniform(0, self.MAX_QUERY_GAP_SECONDS))
|
||||
await asyncio.sleep(2 * random.random())
|
||||
self.running = False
|
||||
|
||||
|
||||
@@ -358,34 +356,20 @@ async def run_concurrent_computes(env: NeonEnv,
|
||||
background_tasks = [asyncio.create_task(compute.run()) for compute in computes]
|
||||
|
||||
await asyncio.sleep(run_seconds)
|
||||
log.info("stopping all tasks but one")
|
||||
for compute in computes[1:]:
|
||||
compute.stopped = True
|
||||
await asyncio.gather(*background_tasks[1:])
|
||||
log.info("stopped all tasks but one")
|
||||
|
||||
# work for some time with only one compute -- it should be able to make some xacts
|
||||
TIMEOUT_SECONDS = computes[0].MAX_QUERY_GAP_SECONDS + 3
|
||||
initial_queries_by_0 = len(computes[0].successful_queries)
|
||||
log.info(f'Waiting for another query by computes[0], '
|
||||
f'it already had {initial_queries_by_0}, timeout is {TIMEOUT_SECONDS}s')
|
||||
for _ in range(10 * TIMEOUT_SECONDS):
|
||||
current_queries_by_0 = len(computes[0].successful_queries) - initial_queries_by_0
|
||||
if current_queries_by_0 >= 1:
|
||||
log.info(f'Found {current_queries_by_0} successful queries '
|
||||
f'by computes[0], completing the test')
|
||||
break
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
assert False, "Timed out while waiting for another query by computes[0]"
|
||||
await asyncio.sleep(8)
|
||||
computes[0].stopped = True
|
||||
|
||||
await asyncio.gather(background_tasks[0])
|
||||
await asyncio.gather(*background_tasks)
|
||||
|
||||
result = await exec_compute_query(env, branch, 'SELECT * FROM query_log')
|
||||
# we should have inserted something while single compute was running
|
||||
log.info(f'Executed {len(result)} queries, {current_queries_by_0} of them '
|
||||
f'by computes[0] after we started stopping the others')
|
||||
assert len(result) >= 4
|
||||
log.info(f'Executed {len(result)} queries')
|
||||
for row in result:
|
||||
log.info(f'{row[0]} {row[1]} {row[2]}')
|
||||
|
||||
|
||||
@@ -1276,9 +1276,12 @@ class WalCraft(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res.stdout.split('\n')
|
||||
|
||||
def in_existing(self, type: str, connection: str) -> None:
|
||||
def in_existing(self, type: str, connection: str) -> int:
|
||||
res = self.raw_cli(["in-existing", type, connection])
|
||||
res.check_returncode()
|
||||
m = re.fullmatch(r'end_of_wal = (.*)\n', res.stdout)
|
||||
assert m
|
||||
return lsn_from_hex(m.group(1))
|
||||
|
||||
|
||||
class NeonPageserver(PgProtocol):
|
||||
|
||||
@@ -83,9 +83,6 @@ def get_dir_size(path: str) -> int:
|
||||
totalbytes = 0
|
||||
for root, dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
try:
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
except FileNotFoundError as e:
|
||||
pass # file could be concurrently removed
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
|
||||
return totalbytes
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 8b38038ac6...9c99008445
@@ -33,9 +33,7 @@ itoa = { version = "0.4", features = ["i128", "std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
memchr = { version = "2", features = ["std", "use_std"] }
|
||||
nom = { version = "7", features = ["alloc", "std"] }
|
||||
num-bigint = { version = "0.4", features = ["std"] }
|
||||
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
|
||||
num-integer = { version = "0.1", default-features = false, features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "std"] }
|
||||
prost = { version = "0.10", features = ["prost-derive", "std"] }
|
||||
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
|
||||
@@ -43,11 +41,10 @@ regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cac
|
||||
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
scopeguard = { version = "1", features = ["use_std"] }
|
||||
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
|
||||
time = { version = "0.3", features = ["alloc", "formatting", "itoa", "macros", "parsing", "quickcheck", "quickcheck-dep", "std", "time-macros"] }
|
||||
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "winapi"] }
|
||||
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "io"] }
|
||||
tracing = { version = "0.1", features = ["attributes", "log", "std", "tracing-attributes"] }
|
||||
tracing-core = { version = "0.1", features = ["lazy_static", "std", "valuable"] }
|
||||
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
|
||||
|
||||
[build-dependencies]
|
||||
ahash = { version = "0.7", features = ["std"] }
|
||||
@@ -60,7 +57,6 @@ indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
memchr = { version = "2", features = ["std", "use_std"] }
|
||||
nom = { version = "7", features = ["alloc", "std"] }
|
||||
prost = { version = "0.10", features = ["prost-derive", "std"] }
|
||||
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
|
||||
Reference in New Issue
Block a user