Compare commits

...

16 Commits

Author SHA1 Message Date
Arpad Müller
229157e323 Enable blob_batch deletions 2024-12-10 00:28:51 +01:00
Evan Fleming
b593e51eae safekeeper: use arc for global timelines and config (#10051)
Hello! I was interested in potentially making some contributions to Neon
and looking through the issue backlog I found
[8200](https://github.com/neondatabase/neon/issues/8200) which seemed
like a good first issue to attempt to tackle. I see it was assigned a
while ago so apologies if I'm stepping on any toes with this PR. I also
apologize for the size of this PR. I'm not sure if there is a simple way
to reduce it given the footprint of the components being changed.

## Problem
This PR is attempting to address part of the problem outlined in issue
[8200](https://github.com/neondatabase/neon/issues/8200). Namely to
remove global static usage of timeline state in favour of
`Arc<GlobalTimelines>` and to replace wasteful clones of
`SafeKeeperConf` with `Arc<SafeKeeperConf>`. I did not opt to tackle
`RemoteStorage` in this PR to minimize the amount of changes as this PR
is already quite large. I also did not opt to introduce an
`SafekeeperApp` wrapper struct to similarly minimize changes but I can
tackle either or both of these omissions in this PR if folks would like.

## Summary of changes
- Remove static usage of `GlobalTimelines` in favour of
`Arc<GlobalTimelines>`
- Wrap `SafeKeeperConf` in `Arc` to avoid wasteful clones of the
underlying struct

## Some additional thoughts
- We seem to currently store `SafeKeeperConf` in `GlobalTimelines` and
then expose it through a public`get_global_config` function which
requires locking. This seems needlessly wasteful and based on observed
usage we could remove this public accessor and force consumers to
acquire `SafeKeeperConf` through the new Arc reference.
2024-12-09 21:09:20 +00:00
Alex Chi Z.
4c4cb80186 fix(pageserver): fix gc-compaction racing with legacy gc (#10052)
## Problem

close https://github.com/neondatabase/neon/issues/10049, close
https://github.com/neondatabase/neon/issues/10030, close
https://github.com/neondatabase/neon/issues/8861

part of https://github.com/neondatabase/neon/issues/9114

The legacy gc process calls `get_latest_gc_cutoff`, which uses a Rcu
different than the gc_info struct. In the gc_compaction_smoke test case,
the "latest" cutoff could be lower than the gc_info struct, causing
gc-compaction to collect data that could be accessed by
`latest_gc_cutoff`. Technically speaking, there's nothing wrong with
gc-compaction using gc_info without considering latest_gc_cutoff,
because gc_info is the source of truth. But anyways, let's fix it.

## Summary of changes

* gc-compaction uses `latest_gc_cutoff` instead of gc_info to determine
the gc horizon.
* if a gc-compaction is scheduled via tenant compaction iteration, it
will take the gc_block lock to avoid racing with functionalities like
detach ancestor (if it's triggered via manual compaction API without
scheduling, then it won't take the lock)

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-12-09 20:06:06 +00:00
a-masterov
92273b6d5e Enable the pg_regress tests on staging for PG17 (#9978)
## Problem
Currently, we run the `pg_regress` tests only for PG16
However, PG17 is a part of Neon and should be tested as well 
## Summary of changes
Modified the workflow and added a patch for PG17 enabling the
`pg_regress` tests.
The problem with leftovers was solved by using branches.
2024-12-09 19:30:39 +00:00
Arpad Müller
e74e7aac93 Use updated patched azure SDK crates (#10036)
For a while already, we've been unable to update the Azure SDK crates
due to Azure adopting use of a non-tokio async runtime, see #7545.

The effort to upstream the fix got stalled, and I think it's better to
switch to a patched version of the SDK that is up to date.

Now we have a fork of the SDK under the neondatabase github org, to
which I have applied Conrad's rebased patches to:
https://github.com/neondatabase/azure-sdk-for-rust/tree/neon .

The existence of a fork will also help with shipping bulk delete support
before it's upstreamed (#7931).

Also, in related news, the Azure SDK has gotten a rift in development,
where the main branch pertains to a future, to-be-officially-blessed
release of the SDK, and the older versions, which we are currently
using, are on the `legacy` branch. Upstream doesn't really want patches
for the `legacy` branch any more, they want to focus on the `main`
efforts. However, even then, the `legacy` branch is still newer than
what we are having right now, so let's switch to `legacy` for now.

Depending on how long it takes, we can switch to the official version of
the SDK once it's released or switch to the upstream `main` branch if
there is changes we want before that.

As a nice side effect of this PR, we now use reqwest 0.12 everywhere,
dropping the dependency on version 0.11.

Fixes #7545
2024-12-09 15:50:06 +00:00
Vlad Lazar
4cca5cdb12 deps: update url to 2.5.4 for RUSTSEC-2024-0421 (#10059)
## Problem

See https://rustsec.org/advisories/RUSTSEC-2024-0421

## Summary of changes

Update url crate to 2.5.4.
2024-12-09 14:57:42 +00:00
Arpad Müller
9d425b54f7 Update AWS SDK crates (#10056)
Result of running:

cargo update -p aws-types -p aws-sigv4 -p aws-credential-types -p
aws-smithy-types -p aws-smithy-async -p aws-sdk-kms -p aws-sdk-iam -p
aws-sdk-s3 -p aws-config

We want to keep the AWS SDK up to date as that way we benefit from new
developments and improvements.
2024-12-09 12:46:59 +00:00
John Spray
ec790870d5 storcon: automatically clear Pause/Stop scheduling policies to enable detaches (#10011)
## Problem

We saw a tenant get stuck when it had been put into Pause scheduling
mode to pin it to a pageserver, then it was left idle for a while and
the control plane tried to detach it.

Close: https://github.com/neondatabase/neon/issues/9957

## Summary of changes

- When changing policy to Detached or Secondary, set the scheduling
policy to Active.
- Add a test that exercises this
- When persisting tenant shards, set their `generation_pageserver` to
null if the placement policy is not Attached (this enables consistency
checks to work, and avoids leaving state in the DB that could be
confusing/misleading in future)
2024-12-07 13:05:09 +00:00
Christian Schwarz
4d7111f240 page_service: don't count time spent flushing towards smgr latency metrics (#10042)
## Problem

In #9962 I changed the smgr metrics to include time spent on flush.

It isn't under our (=storage team's) control how long that flush takes
because the client can stop reading requests.

## Summary of changes

Stop the timer as soon as we've buffered up the response in the
`pgb_writer`.

Track flush time in a separate metric.

---------

Co-authored-by: Yuchen Liang <70461588+yliang412@users.noreply.github.com>
2024-12-07 08:57:55 +00:00
Alex Chi Z.
b1fd086c0c test(pageserver): disable gc_compaction smoke test for now (#10045)
## Problem

The test is flaky.

## Summary of changes

Disable the test.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-06 22:30:04 +00:00
Heikki Linnakangas
b6eea65597 Fix error message if PS connection is lost while receiving prefetch (#9923)
If the pageserver connection is lost while receiving the prefetch
request, the prefetch queue is cleared. The error message prints the
values from the prefetch slot, but because the slot was already cleared,
they're all zeros:

LOG: [NEON_SMGR] [shard 0] No response from reading prefetch entry 0:
0/0/0.0 block 0. This can be caused by a concurrent disconnect

To fix, make local copies of the values.

In the passing, also add a sanity check that if the receive() call
succeeds, the prefetch slot is still intact.
2024-12-06 20:56:57 +00:00
Alex Chi Z.
c42c28b339 feat(pageserver): gc-compaction split job and partial scheduler (#9897)
## Problem

part of https://github.com/neondatabase/neon/issues/9114, stacked PR
over #9809

The compaction scheduler now schedules partial compaction jobs.

## Summary of changes

* Add the compaction job splitter based on size.
* Schedule subcompactions using the compaction scheduler.
* Test subcompaction scheduler in the smoke regress test.
* Temporarily disable layer map checks

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-06 18:44:26 +00:00
Tristan Partin
e4837b0a5a Bump sql_exporter to 0.16.0 (#10041)
Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-12-06 17:43:55 +00:00
Erik Grinaker
14c4fae64a test_runner/performance: add improved bulk insert benchmark (#9812)
Adds an improved bulk insert benchmark, including S3 uploads.

Touches #9789.
2024-12-06 15:17:15 +00:00
Vlad Lazar
cc70fc802d pageserver: add metric for number of wal records received by each shard (#10035)
## Problem

With the current metrics we can't identify which shards are ingesting
data at any given time.

## Summary of changes

Add a metric for the number of wal records received for processing by
each shard. This is per (tenant, timeline, shard).
2024-12-06 12:51:41 +00:00
Alexey Kondratov
fa07097f2f chore: Reorganize and refresh CODEOWNERS (#10008)
## Problem

We didn't have a codeowner for `/compute`, so nobody was auto-assigned
for PRs like #9973

## Summary of changes

While on it:
1. Group codeowners into sections.
2. Remove control plane from the `/compute_tools` because it's primarily
the internal `compute_ctl` code.
3. Add control plane (and compute) to `/libs/compute_api` because that's
the shared public interface of the compute.
2024-12-06 11:44:50 +00:00
47 changed files with 5586 additions and 551 deletions

View File

@@ -21,3 +21,5 @@ config-variables:
- SLACK_UPCOMING_RELEASE_CHANNEL_ID - SLACK_UPCOMING_RELEASE_CHANNEL_ID
- DEV_AWS_OIDC_ROLE_ARN - DEV_AWS_OIDC_ROLE_ARN
- BENCHMARK_INGEST_TARGET_PROJECTID - BENCHMARK_INGEST_TARGET_PROJECTID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID

View File

@@ -23,11 +23,14 @@ jobs:
regress: regress:
env: env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
TEST_OUTPUT: /tmp/test_output TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote BUILD_TYPE: remote
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
runs-on: us-east-2 runs-on: us-east-2
container: container:
@@ -40,9 +43,11 @@ jobs:
submodules: true submodules: true
- name: Patch the test - name: Patch the test
env:
PG_VERSION: ${{matrix.pg-version}}
run: | run: |
cd "vendor/postgres-v${DEFAULT_PG_VERSION}" cd "vendor/postgres-v${PG_VERSION}"
patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch" patch -p1 < "../../compute/patches/cloud_regress_pg${PG_VERSION}.patch"
- name: Generate a random password - name: Generate a random password
id: pwgen id: pwgen
@@ -55,8 +60,9 @@ jobs:
- name: Change tests according to the generated password - name: Change tests according to the generated password
env: env:
DBPASS: ${{ steps.pwgen.outputs.DBPASS }} DBPASS: ${{ steps.pwgen.outputs.DBPASS }}
PG_VERSION: ${{matrix.pg-version}}
run: | run: |
cd vendor/postgres-v"${DEFAULT_PG_VERSION}"/src/test/regress cd vendor/postgres-v"${PG_VERSION}"/src/test/regress
for fname in sql/*.sql expected/*.out; do for fname in sql/*.sql expected/*.out; do
sed -i.bak s/NEON_PASSWORD_PLACEHOLDER/"'${DBPASS}'"/ "${fname}" sed -i.bak s/NEON_PASSWORD_PLACEHOLDER/"'${DBPASS}'"/ "${fname}"
done done
@@ -73,15 +79,29 @@ jobs:
path: /tmp/neon/ path: /tmp/neon/
prefix: latest prefix: latest
- name: Create a new branch
id: create-branch
uses: ./.github/actions/neon-branch-create
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
- name: Run the regression tests - name: Run the regression tests
uses: ./.github/actions/run-python-test-set uses: ./.github/actions/run-python-test-set
with: with:
build_type: ${{ env.BUILD_TYPE }} build_type: ${{ env.BUILD_TYPE }}
test_selection: cloud_regress test_selection: cloud_regress
pg_version: ${{ env.DEFAULT_PG_VERSION }} pg_version: ${{matrix.pg-version}}
extra_params: -m remote_cluster extra_params: -m remote_cluster
env: env:
BENCHMARK_CONNSTR: ${{ secrets.PG_REGRESS_CONNSTR }} BENCHMARK_CONNSTR: ${{steps.create-branch.outputs.dsn}}
- name: Delete branch
uses: ./.github/actions/neon-branch-delete
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
branch_id: ${{steps.create-branch.outputs.branch_id}}
- name: Create Allure report - name: Create Allure report
id: create-allure-report id: create-allure-report

View File

@@ -1,16 +1,29 @@
/.github/ @neondatabase/developer-productivity # Autoscaling
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
/libs/pageserver_api/ @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
/libs/proxy/ @neondatabase/proxy
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/storage
/libs/vm_monitor/ @neondatabase/autoscaling /libs/vm_monitor/ @neondatabase/autoscaling
/pageserver/ @neondatabase/storage
# DevProd
/.github/ @neondatabase/developer-productivity
# Compute
/pgxn/ @neondatabase/compute /pgxn/ @neondatabase/compute
/pgxn/neon/ @neondatabase/compute @neondatabase/storage /vendor/ @neondatabase/compute
/compute/ @neondatabase/compute
/compute_tools/ @neondatabase/compute
# Proxy
/libs/proxy/ @neondatabase/proxy
/proxy/ @neondatabase/proxy /proxy/ @neondatabase/proxy
# Storage
/pageserver/ @neondatabase/storage
/safekeeper/ @neondatabase/storage /safekeeper/ @neondatabase/storage
/storage_controller @neondatabase/storage /storage_controller @neondatabase/storage
/storage_scrubber @neondatabase/storage /storage_scrubber @neondatabase/storage
/vendor/ @neondatabase/compute /libs/pageserver_api/ @neondatabase/storage
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/storage
# Shared
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
/libs/compute_api/ @neondatabase/compute @neondatabase/control-plane
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage

515
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -51,10 +51,6 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6" arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0" atomic-take = "1.1.0"
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
flate2 = "1.0.26" flate2 = "1.0.26"
async-stream = "0.3" async-stream = "0.3"
async-trait = "0.1" async-trait = "0.1"
@@ -216,6 +212,12 @@ postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git",
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
## Azure SDK crates
azure_core = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
## Local libraries ## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" } compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }

View File

@@ -115,7 +115,7 @@ RUN set -e \
# Keep the version the same as in compute/compute-node.Dockerfile and # Keep the version the same as in compute/compute-node.Dockerfile and
# test_runner/regress/test_compute_metrics.py. # test_runner/regress/test_compute_metrics.py.
ENV SQL_EXPORTER_VERSION=0.13.1 ENV SQL_EXPORTER_VERSION=0.16.0
RUN curl -fsSL \ RUN curl -fsSL \
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \ "https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
--output sql_exporter.tar.gz \ --output sql_exporter.tar.gz \

View File

@@ -1324,7 +1324,7 @@ FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.1 AS postgres-exporter
# Keep the version the same as in build-tools.Dockerfile and # Keep the version the same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py. # test_runner/regress/test_compute_metrics.py.
FROM burningalchemist/sql_exporter:0.13.1 AS sql-exporter FROM burningalchemist/sql_exporter:0.16.0 AS sql-exporter
######################################################################################### #########################################################################################
# #

File diff suppressed because it is too large Load Diff

View File

@@ -42,6 +42,7 @@ allow = [
"MPL-2.0", "MPL-2.0",
"OpenSSL", "OpenSSL",
"Unicode-DFS-2016", "Unicode-DFS-2016",
"Unicode-3.0",
] ]
confidence-threshold = 0.8 confidence-threshold = 0.8
exceptions = [ exceptions = [

View File

@@ -245,6 +245,17 @@ impl From<NodeAvailability> for NodeAvailabilityWrapper {
} }
} }
/// Scheduling policy enables us to selectively disable some automatic actions that the
/// controller performs on a tenant shard. This is only set to a non-default value by
/// human intervention, and it is reset to the default value (Active) when the tenant's
/// placement policy is modified away from Attached.
///
/// The typical use of a non-Active scheduling policy is one of:
/// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
/// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
///
/// If you're not sure which policy to use to pin a shard to its current location, you probably
/// want Pause.
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)] #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum ShardSchedulingPolicy { pub enum ShardSchedulingPolicy {
// Normal mode: the tenant's scheduled locations may be updated at will, including // Normal mode: the tenant's scheduled locations may be updated at will, including

View File

@@ -8,15 +8,14 @@ use std::io;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::time::SystemTime; use std::time::SystemTime;
use super::REMOTE_STORAGE_PREFIX_SEPARATOR; use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range}; use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, RetryOptions}; use azure_core::{Continuable, RetryOptions};
use azure_identity::DefaultAzureCredential;
use azure_storage::StorageCredentials; use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus; use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::prelude::ClientBuilder; use azure_storage_blobs::prelude::ClientBuilder;
@@ -76,8 +75,9 @@ impl AzureBlobStorage {
let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") { let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
StorageCredentials::access_key(account.clone(), access_key) StorageCredentials::access_key(account.clone(), access_key)
} else { } else {
let token_credential = DefaultAzureCredential::default(); let token_credential = azure_identity::create_default_credential()
StorageCredentials::token_credential(Arc::new(token_credential)) .context("trying to obtain Azure default credentials")?;
StorageCredentials::token_credential(token_credential)
}; };
// we have an outer retry // we have an outer retry
@@ -556,7 +556,7 @@ impl RemoteStorage for AzureBlobStorage {
let op = async { let op = async {
// TODO batch requests are not supported by the SDK // TODO batch requests are not supported by the SDK
// https://github.com/Azure/azure-sdk-for-rust/issues/1068 // https://github.com/Azure/azure-sdk-for-rust/issues/1068
for path in paths { for path_chunk in paths.chunks(256) {
#[derive(Debug)] #[derive(Debug)]
enum AzureOrTimeout { enum AzureOrTimeout {
AzureError(azure_core::Error), AzureError(azure_core::Error),
@@ -572,13 +572,20 @@ impl RemoteStorage for AzureBlobStorage {
let max_retries = 5; let max_retries = 5;
backoff::retry( backoff::retry(
|| async { || async {
let blob_client = self.client.blob_client(self.relative_path_to_name(path)); let mut batch_client = self.client.blob_batch();
for path in path_chunk {
batch_client = match batch_client.delete(self.relative_path_to_name(path)) {
Ok(batch_client) => batch_client,
Err(e) => return Err(AzureOrTimeout::AzureError(e)),
};
}
let request = blob_client.delete().into_future(); let request = batch_client.into_future();
let res = tokio::time::timeout(self.timeout, request).await; let res = tokio::time::timeout(self.timeout, request).await;
match res { match res {
// TODO: validate that all deletions were successful
Ok(Ok(_v)) => Ok(()), Ok(Ok(_v)) => Ok(()),
Ok(Err(azure_err)) => { Ok(Err(azure_err)) => {
if let Some(http_err) = azure_err.as_http_error() { if let Some(http_err) = azure_err.as_http_error() {

View File

@@ -2036,15 +2036,23 @@ async fn timeline_compact_handler(
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")? parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
.unwrap_or(false); .unwrap_or(false);
let sub_compaction = compact_request
.as_ref()
.map(|r| r.sub_compaction)
.unwrap_or(false);
let options = CompactOptions { let options = CompactOptions {
compact_range: compact_request compact_range: compact_request
.as_ref() .as_ref()
.and_then(|r| r.compact_range.clone()), .and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn), compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
flags, flags,
sub_compaction,
}; };
let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false); let scheduled = compact_request
.as_ref()
.map(|r| r.scheduled)
.unwrap_or(false);
async { async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -2053,7 +2061,7 @@ async fn timeline_compact_handler(
let tenant = state let tenant = state
.tenant_manager .tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?; .get_attached_tenant_shard(tenant_shard_id)?;
let rx = tenant.schedule_compaction(timeline_id, options).await; let rx = tenant.schedule_compaction(timeline_id, options).await.map_err(ApiError::InternalServerError)?;
if wait_until_scheduled_compaction_done { if wait_until_scheduled_compaction_done {
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction. // It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
rx.await.ok(); rx.await.ok();

View File

@@ -1223,31 +1223,60 @@ pub(crate) mod virtual_file_io_engine {
}); });
} }
pub(crate) struct SmgrOpTimer { pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
pub(crate) struct SmgrOpTimerInner {
global_latency_histo: Histogram, global_latency_histo: Histogram,
// Optional because not all op types are tracked per-timeline // Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>, per_timeline_latency_histo: Option<Histogram>,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
start: Instant, start: Instant,
throttled: Duration, throttled: Duration,
op: SmgrQueryType, op: SmgrQueryType,
} }
pub(crate) struct SmgrOpFlushInProgress {
base: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl SmgrOpTimer { impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) { pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else { let Some(throttle) = throttle else {
return; return;
}; };
self.throttled += *throttle; let inner = self.0.as_mut().expect("other public methods consume self");
inner.throttled += *throttle;
} }
}
impl Drop for SmgrOpTimer { pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
fn drop(&mut self) { let (flush_start, inner) = self
let elapsed = self.start.elapsed(); .smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
SmgrOpFlushInProgress {
base: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
}
let elapsed = match elapsed.checked_sub(self.throttled) { /// Returns `None`` if this method has already been called, `Some` otherwise.
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
let inner = self.0.take()?;
let now = Instant::now();
let elapsed = now - inner.start;
let elapsed = match elapsed.checked_sub(inner.throttled) {
Some(elapsed) => elapsed, Some(elapsed) => elapsed,
None => { None => {
use utils::rate_limit::RateLimit; use utils::rate_limit::RateLimit;
@@ -1258,9 +1287,9 @@ impl Drop for SmgrOpTimer {
}))) })))
}); });
let mut guard = LOGGED.lock().unwrap(); let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op]; let rate_limit = &mut guard[inner.op];
rate_limit.call(|| { rate_limit.call(|| {
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time"); warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
}); });
elapsed // un-throttled time, more info than just saturating to 0 elapsed // un-throttled time, more info than just saturating to 0
} }
@@ -1268,10 +1297,54 @@ impl Drop for SmgrOpTimer {
let elapsed = elapsed.as_secs_f64(); let elapsed = elapsed.as_secs_f64();
self.global_latency_histo.observe(elapsed); inner.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo { if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed); per_timeline_getpage_histo.observe(elapsed);
} }
Some((now, inner))
}
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
self.smgr_op_end();
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
let now = Instant::now();
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let elapsed = now - self.base;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
} }
} }
@@ -1302,6 +1375,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_getpage_latency: Histogram, per_timeline_getpage_latency: Histogram,
global_batch_size: Histogram, global_batch_size: Histogram,
per_timeline_batch_size: Histogram, per_timeline_batch_size: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
} }
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| { static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
@@ -1464,6 +1539,26 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
.set(value.try_into().unwrap()); .set(value.try_into().unwrap());
} }
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_service_pagestream_flush_in_progress_micros",
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
easily discoverable in monitoring. \
Hence, this is NOT a completion latency historgram.",
&["tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
)
.expect("failed to define a metric")
});
impl SmgrQueryTimePerTimeline { impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self { pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string(); let tenant_id = tenant_shard_id.tenant_id.to_string();
@@ -1504,6 +1599,12 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap(); .unwrap();
let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
Self { Self {
global_started, global_started,
global_latency, global_latency,
@@ -1511,6 +1612,8 @@ impl SmgrQueryTimePerTimeline {
per_timeline_getpage_started, per_timeline_getpage_started,
global_batch_size, global_batch_size,
per_timeline_batch_size, per_timeline_batch_size,
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
} }
} }
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer { pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
@@ -1523,13 +1626,17 @@ impl SmgrQueryTimePerTimeline {
None None
}; };
SmgrOpTimer { SmgrOpTimer(Some(SmgrOpTimerInner {
global_latency_histo: self.global_latency[op as usize].clone(), global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo, per_timeline_latency_histo,
start: started_at, start: started_at,
op, op,
throttled: Duration::ZERO, throttled: Duration::ZERO,
} global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
}))
} }
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) { pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
@@ -2204,6 +2311,15 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
.expect("failed to define a metric"), .expect("failed to define a metric"),
}); });
pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_timeline_wal_records_received",
"Number of WAL records received per shard",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| { pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!( register_histogram!(
"pageserver_wal_redo_seconds", "pageserver_wal_redo_seconds",
@@ -2431,6 +2547,7 @@ pub(crate) struct TimelineMetrics {
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>, pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
/// Number of valid LSN leases. /// Number of valid LSN leases.
pub valid_lsn_lease_count_gauge: UIntGauge, pub valid_lsn_lease_count_gauge: UIntGauge,
pub wal_records_received: IntCounter,
shutdown: std::sync::atomic::AtomicBool, shutdown: std::sync::atomic::AtomicBool,
} }
@@ -2588,6 +2705,10 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap(); .unwrap();
let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
TimelineMetrics { TimelineMetrics {
tenant_id, tenant_id,
shard_id, shard_id,
@@ -2620,6 +2741,7 @@ impl TimelineMetrics {
evictions_with_low_residence_duration, evictions_with_low_residence_duration,
), ),
valid_lsn_lease_count_gauge, valid_lsn_lease_count_gauge,
wal_records_received,
shutdown: std::sync::atomic::AtomicBool::default(), shutdown: std::sync::atomic::AtomicBool::default(),
} }
} }
@@ -2757,6 +2879,16 @@ impl TimelineMetrics {
shard_id, shard_id,
timeline_id, timeline_id,
]); ]);
let _ = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
} }
} }

View File

@@ -1017,10 +1017,8 @@ impl PageServerHandler {
// Map handler result to protocol behavior. // Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol. // Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol.
let mut timers: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(handler_results.len());
for handler_result in handler_results { for handler_result in handler_results {
let response_msg = match handler_result { let (response_msg, timer) = match handler_result {
Err(e) => match &e { Err(e) => match &e {
PageStreamError::Shutdown => { PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of // If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1044,34 +1042,66 @@ impl PageServerHandler {
span.in_scope(|| { span.in_scope(|| {
error!("error reading relation or page version: {full:#}") error!("error reading relation or page version: {full:#}")
}); });
PagestreamBeMessage::Error(PagestreamErrorResponse { (
message: e.to_string(), PagestreamBeMessage::Error(PagestreamErrorResponse {
}) message: e.to_string(),
}),
None, // TODO: measure errors
)
} }
}, },
Ok((response_msg, timer)) => { Ok((response_msg, timer)) => (response_msg, Some(timer)),
// Extending the lifetime of the timers so observations on drop
// include the flush time.
timers.push(timer);
response_msg
}
}; };
//
// marshal & transmit response message // marshal & transmit response message
//
pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! { // We purposefully don't count flush time into the timer.
biased; //
_ = cancel.cancelled() => { // The reason is that current compute client will not perform protocol processing
// We were requested to shut down. // if the postgres backend process is doing things other than `->smgr_read()`.
info!("shutdown request received in page handler"); // This is especially the case for prefetch.
return Err(QueryError::Shutdown) //
} // If the compute doesn't read from the connection, eventually TCP will backpressure
res = pgb_writer.flush() => { // all the way into our flush call below.
res?; //
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = flush_fut => {
res?;
}
}
Ok(())
} }
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
} }
drop(timers);
Ok(()) Ok(())
} }

View File

@@ -49,6 +49,7 @@ use timeline::import_pgdata;
use timeline::offload::offload_timeline; use timeline::offload::offload_timeline;
use timeline::CompactFlags; use timeline::CompactFlags;
use timeline::CompactOptions; use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::ShutdownMode; use timeline::ShutdownMode;
use tokio::io::BufReader; use tokio::io::BufReader;
use tokio::sync::watch; use tokio::sync::watch;
@@ -2987,10 +2988,16 @@ impl Tenant {
if has_pending_l0_compaction_task { if has_pending_l0_compaction_task {
Some(true) Some(true)
} else { } else {
let has_pending_scheduled_compaction_task; let mut has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = { let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) { if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
if !tline_pending_tasks.is_empty() {
info!(
"{} tasks left in the compaction schedule queue",
tline_pending_tasks.len()
);
}
let next_task = tline_pending_tasks.pop_front(); let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty(); has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task next_task
@@ -3007,6 +3014,41 @@ impl Tenant {
.contains(CompactFlags::EnhancedGcBottomMostCompaction) .contains(CompactFlags::EnhancedGcBottomMostCompaction)
{ {
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options); warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else if next_scheduled_compaction_task.options.sub_compaction {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = timeline
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
} else {
has_pending_scheduled_compaction_task = true;
let jobs_len = jobs.len();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
ScheduledCompactionTask {
options: job,
// The last job in the queue sends the signal and releases the gc guard
result_tx: next_scheduled_compaction_task
.result_tx
.take(),
gc_block: next_scheduled_compaction_task
.gc_block
.take(),
}
} else {
ScheduledCompactionTask {
options: job,
result_tx: None,
gc_block: None,
}
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
}
} else { } else {
let _ = timeline let _ = timeline
.compact_with_options( .compact_with_options(
@@ -3062,15 +3104,22 @@ impl Tenant {
&self, &self,
timeline_id: TimelineId, timeline_id: TimelineId,
options: CompactOptions, options: CompactOptions,
) -> tokio::sync::oneshot::Receiver<()> { ) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
let gc_guard = match self.gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
bail!("cannot run gc-compaction because gc is blocked: {}", e);
}
};
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default(); let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask { tline_pending_tasks.push_back(ScheduledCompactionTask {
options, options,
result_tx: Some(tx), result_tx: Some(tx),
gc_block: Some(gc_guard),
}); });
rx Ok(rx)
} }
// Call through to all timelines to freeze ephemeral layers if needed. Usually // Call through to all timelines to freeze ephemeral layers if needed. Usually
@@ -8117,6 +8166,12 @@ mod tests {
) )
.await?; .await?;
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x30); guard.cutoffs.time = Lsn(0x30);
@@ -8219,6 +8274,12 @@ mod tests {
// increase GC horizon and compact again // increase GC horizon and compact again
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40); guard.cutoffs.time = Lsn(0x40);
@@ -8599,6 +8660,12 @@ mod tests {
.await? .await?
}; };
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo { *guard = GcInfo {
@@ -8680,6 +8747,12 @@ mod tests {
// increase GC horizon and compact again // increase GC horizon and compact again
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40); guard.cutoffs.time = Lsn(0x40);
@@ -9127,6 +9200,12 @@ mod tests {
) )
.await?; .await?;
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo { *guard = GcInfo {
@@ -9244,7 +9323,7 @@ mod tests {
CompactOptions { CompactOptions {
flags: dryrun_flags, flags: dryrun_flags,
compact_range: None, compact_range: None,
compact_below_lsn: None, ..Default::default()
}, },
&ctx, &ctx,
) )
@@ -9269,6 +9348,12 @@ mod tests {
// increase GC horizon and compact again // increase GC horizon and compact again
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x38))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x38); guard.cutoffs.time = Lsn(0x38);
@@ -9364,6 +9449,12 @@ mod tests {
) )
.await?; .await?;
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo { *guard = GcInfo {
@@ -9481,7 +9572,7 @@ mod tests {
CompactOptions { CompactOptions {
flags: dryrun_flags, flags: dryrun_flags,
compact_range: None, compact_range: None,
compact_below_lsn: None, ..Default::default()
}, },
&ctx, &ctx,
) )
@@ -9608,6 +9699,12 @@ mod tests {
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10))); branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
{ {
parent_tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x10))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = parent_tline.gc_info.write().unwrap(); let mut guard = parent_tline.gc_info.write().unwrap();
*guard = GcInfo { *guard = GcInfo {
@@ -9622,6 +9719,12 @@ mod tests {
} }
{ {
branch_tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x50))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = branch_tline.gc_info.write().unwrap(); let mut guard = branch_tline.gc_info.write().unwrap();
*guard = GcInfo { *guard = GcInfo {
@@ -9951,6 +10054,12 @@ mod tests {
.await?; .await?;
{ {
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info // Update GC info
let mut guard = tline.gc_info.write().unwrap(); let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo { *guard = GcInfo {
@@ -9973,7 +10082,7 @@ mod tests {
CompactOptions { CompactOptions {
flags: EnumSet::new(), flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(2)).into()), compact_range: Some((get_key(0)..get_key(2)).into()),
compact_below_lsn: None, ..Default::default()
}, },
&ctx, &ctx,
) )
@@ -10020,7 +10129,7 @@ mod tests {
CompactOptions { CompactOptions {
flags: EnumSet::new(), flags: EnumSet::new(),
compact_range: Some((get_key(2)..get_key(4)).into()), compact_range: Some((get_key(2)..get_key(4)).into()),
compact_below_lsn: None, ..Default::default()
}, },
&ctx, &ctx,
) )
@@ -10072,7 +10181,7 @@ mod tests {
CompactOptions { CompactOptions {
flags: EnumSet::new(), flags: EnumSet::new(),
compact_range: Some((get_key(4)..get_key(9)).into()), compact_range: Some((get_key(4)..get_key(9)).into()),
compact_below_lsn: None, ..Default::default()
}, },
&ctx, &ctx,
) )
@@ -10123,7 +10232,7 @@ mod tests {
CompactOptions { CompactOptions {
flags: EnumSet::new(), flags: EnumSet::new(),
compact_range: Some((get_key(9)..get_key(10)).into()), compact_range: Some((get_key(9)..get_key(10)).into()),
compact_below_lsn: None, ..Default::default()
}, },
&ctx, &ctx,
) )
@@ -10179,7 +10288,7 @@ mod tests {
CompactOptions { CompactOptions {
flags: EnumSet::new(), flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(10)).into()), compact_range: Some((get_key(0)..get_key(10)).into()),
compact_below_lsn: None, ..Default::default()
}, },
&ctx, &ctx,
) )

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap; use std::{collections::HashMap, sync::Arc};
use utils::id::TimelineId; use utils::id::TimelineId;
@@ -20,7 +20,7 @@ pub(crate) struct GcBlock {
/// Do not add any more features taking and forbidding taking this lock. It should be /// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`] /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex. /// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>, blocking: Arc<tokio::sync::Mutex<()>>,
} }
impl GcBlock { impl GcBlock {
@@ -30,7 +30,7 @@ impl GcBlock {
/// it's ending, or if not currently possible, a value describing the reasons why not. /// it's ending, or if not currently possible, a value describing the reasons why not.
/// ///
/// Cancellation safe. /// Cancellation safe.
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> { pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
let reasons = { let reasons = {
let g = self.reasons.lock().unwrap(); let g = self.reasons.lock().unwrap();
@@ -44,7 +44,7 @@ impl GcBlock {
Err(reasons) Err(reasons)
} else { } else {
Ok(Guard { Ok(Guard {
_inner: self.blocking.lock().await, _inner: self.blocking.clone().lock_owned().await,
}) })
} }
} }
@@ -170,8 +170,8 @@ impl GcBlock {
} }
} }
pub(super) struct Guard<'a> { pub(crate) struct Guard {
_inner: tokio::sync::MutexGuard<'a, ()>, _inner: tokio::sync::OwnedMutexGuard<()>,
} }
#[derive(Debug)] #[derive(Debug)]

View File

@@ -785,6 +785,9 @@ pub(crate) struct CompactRequest {
/// Whether the compaction job should be scheduled. /// Whether the compaction job should be scheduled.
#[serde(default)] #[serde(default)]
pub scheduled: bool, pub scheduled: bool,
/// Whether the compaction job should be split across key ranges.
#[serde(default)]
pub sub_compaction: bool,
} }
#[serde_with::serde_as] #[serde_with::serde_as]
@@ -814,6 +817,9 @@ pub(crate) struct CompactOptions {
/// If set, the compaction will only compact the LSN below this value. /// If set, the compaction will only compact the LSN below this value.
/// This option is only used by GC compaction. /// This option is only used by GC compaction.
pub compact_below_lsn: Option<Lsn>, pub compact_below_lsn: Option<Lsn>,
/// Enable sub-compaction (split compaction job across key ranges).
/// This option is only used by GC compaction.
pub sub_compaction: bool,
} }
impl std::fmt::Debug for Timeline { impl std::fmt::Debug for Timeline {
@@ -1637,6 +1643,7 @@ impl Timeline {
flags, flags,
compact_range: None, compact_range: None,
compact_below_lsn: None, compact_below_lsn: None,
sub_compaction: false,
}, },
ctx, ctx,
) )

View File

@@ -10,8 +10,8 @@ use std::sync::Arc;
use super::layer_manager::LayerManager; use super::layer_manager::LayerManager;
use super::{ use super::{
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode, CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
RecordedDuration, Timeline, ImageLayerCreationMode, RecordedDuration, Timeline,
}; };
use anyhow::{anyhow, bail, Context}; use anyhow::{anyhow, bail, Context};
@@ -29,7 +29,6 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache; use crate::page_cache;
use crate::statvfs::Statvfs; use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{ use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter, BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -42,7 +41,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::ImageLayerCreationOutcome; use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter}; use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer}; use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{DeltaLayer, MaybeOffloaded}; use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile}; use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{ use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD, DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
@@ -64,9 +63,12 @@ use super::CompactionError;
const COMPACTION_DELTA_THRESHOLD: usize = 5; const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// A scheduled compaction task. /// A scheduled compaction task.
pub struct ScheduledCompactionTask { pub(crate) struct ScheduledCompactionTask {
pub options: CompactOptions, pub options: CompactOptions,
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>, pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
pub gc_block: Option<gc_block::Guard>,
} }
pub struct GcCompactionJobDescription { pub struct GcCompactionJobDescription {
@@ -1752,6 +1754,115 @@ impl Timeline {
Ok(()) Ok(())
} }
/// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
/// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
/// ad-hoc information about gc compaction itself.
pub(crate) async fn gc_compaction_split_jobs(
self: &Arc<Self>,
options: CompactOptions,
) -> anyhow::Result<Vec<CompactOptions>> {
if !options.sub_compaction {
return Ok(vec![options]);
}
let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
start: Key::MIN,
end: Key::MAX,
});
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
compact_below_lsn
} else {
*self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
};
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock");
};
let ((dense_ks, sparse_ks), _) = &*partition;
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,
source_end: &Key,
target_start: &Key,
target_end: &Key,
) -> Option<(Key, Key)> {
let start = source_start.max(target_start);
let end = source_end.min(target_end);
if start < end {
Some((*start, *end))
} else {
None
}
}
let mut split_key_ranges = Vec::new();
let ranges = dense_ks
.parts
.iter()
.map(|partition| partition.ranges.iter())
.chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
.flatten()
.cloned()
.collect_vec();
for range in ranges.iter() {
let Some((start, end)) = truncate_to(
&range.start,
&range.end,
&compact_range.start,
&compact_range.end,
) else {
continue;
};
split_key_ranges.push((start, end));
}
split_key_ranges.sort();
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut current_start = None;
// Split compaction job to about 2GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
let ranges_num = split_key_ranges.len();
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
if current_start.is_none() {
current_start = Some(start);
}
let start = current_start.unwrap();
if start >= end {
// We have already processed this partition.
continue;
}
let res = layer_map.range_search(start..end, compact_below_lsn);
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
let mut compact_options = options.clone();
// Try to extend the compaction range so that we include at least one full layer file.
let extended_end = res
.found
.keys()
.map(|layer| layer.layer.key_range.end)
.min();
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
// In this case, we simply use the specified key range end.
let end = if let Some(extended_end) = extended_end {
extended_end.max(end)
} else {
end
};
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_options.compact_range = Some(CompactRange { start, end });
compact_options.compact_below_lsn = Some(compact_below_lsn);
compact_options.sub_compaction = false;
compact_jobs.push(compact_options);
current_start = Some(end);
}
}
drop(guard);
Ok(compact_jobs)
}
/// An experimental compaction building block that combines compaction with garbage collection. /// An experimental compaction building block that combines compaction with garbage collection.
/// ///
/// The current implementation picks all delta + image layers that are below or intersecting with /// The current implementation picks all delta + image layers that are below or intersecting with
@@ -1774,6 +1885,36 @@ impl Timeline {
options: CompactOptions, options: CompactOptions,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if options.sub_compaction {
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = self.gc_compaction_split_jobs(options).await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx).await?;
}
if jobs_len == 0 {
info!("no jobs to run, skipping gc bottom-most compaction");
}
return Ok(());
}
self.compact_with_gc_inner(cancel, options, ctx).await
}
async fn compact_with_gc_inner(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
assert!(
!options.sub_compaction,
"sub-compaction should be handled by the outer function"
);
// Block other compaction/GC tasks from running for now. GC-compaction could run along // Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
// Note that we already acquired the compaction lock when the outer `compact` function gets called. // Note that we already acquired the compaction lock when the outer `compact` function gets called.
@@ -1823,7 +1964,11 @@ impl Timeline {
let gc_info = self.gc_info.read().unwrap(); let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new(); let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = { let gc_cutoff = {
let real_gc_cutoff = gc_info.cutoffs.select_min(); // Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
// Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
// cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
// to get the truth data.
let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
// The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for // The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
// each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use // each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
// the real cutoff. // the real cutoff.
@@ -1943,14 +2088,15 @@ impl Timeline {
// Step 1: construct a k-merge iterator over all layers. // Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let layer_names = job_desc // disable the check for now because we need to adjust the check for partial compactions, will enable later.
.selected_layers // let layer_names = job_desc
.iter() // .selected_layers
.map(|layer| layer.layer_desc().layer_name()) // .iter()
.collect_vec(); // .map(|layer| layer.layer_desc().layer_name())
if let Some(err) = check_valid_layermap(&layer_names) { // .collect_vec();
warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err); // if let Some(err) = check_valid_layermap(&layer_names) {
} // warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
// The maximum LSN we are processing in this compaction loop // The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc let end_lsn = job_desc
.selected_layers .selected_layers

View File

@@ -369,6 +369,13 @@ pub(super) async fn handle_walreceiver_connection(
// advances it to its end LSN. 0 is just an initialization placeholder. // advances it to its end LSN. 0 is just an initialization placeholder.
let mut modification = timeline.begin_modification(Lsn(0)); let mut modification = timeline.begin_modification(Lsn(0));
if !records.is_empty() {
timeline
.metrics
.wal_records_received
.inc_by(records.len() as u64);
}
for interpreted in records { for interpreted in records {
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0 && uncommitted_records > 0
@@ -510,6 +517,7 @@ pub(super) async fn handle_walreceiver_connection(
} }
// Ingest the records without immediately committing them. // Ingest the records without immediately committing them.
timeline.metrics.wal_records_received.inc();
let ingested = walingest let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx) .ingest_record(interpreted, &mut modification, &ctx)
.await .await

View File

@@ -610,6 +610,9 @@ prefetch_read(PrefetchRequest *slot)
{ {
NeonResponse *response; NeonResponse *response;
MemoryContext old; MemoryContext old;
BufferTag buftag;
shardno_t shard_no;
uint64 my_ring_index;
Assert(slot->status == PRFS_REQUESTED); Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL); Assert(slot->response == NULL);
@@ -623,11 +626,29 @@ prefetch_read(PrefetchRequest *slot)
slot->status, slot->response, slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive); (long)slot->my_ring_index, (long)MyPState->ring_receive);
/*
* Copy the request info so that if an error happens and the prefetch
* queue is flushed during the receive call, we can print the original
* values in the error message
*/
buftag = slot->buftag;
shard_no = slot->shard_no;
my_ring_index = slot->my_ring_index;
old = MemoryContextSwitchTo(MyPState->errctx); old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(slot->shard_no); response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old); MemoryContextSwitchTo(old);
if (response) if (response)
{ {
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(shard_no, ERROR,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
/* update prefetch state */ /* update prefetch state */
MyPState->n_responses_buffered += 1; MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1; MyPState->n_requests_inflight -= 1;
@@ -642,11 +663,15 @@ prefetch_read(PrefetchRequest *slot)
} }
else else
{ {
neon_shard_log(slot->shard_no, LOG, /*
* Note: The slot might no longer be valid, if the connection was lost
* and the prefetch queue was flushed during the receive call
*/
neon_shard_log(shard_no, LOG,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect", "No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long)slot->my_ring_index, (long) my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)), RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
slot->buftag.forkNum, slot->buftag.blockNum); buftag.forkNum, buftag.blockNum);
return false; return false;
} }
} }

View File

@@ -83,14 +83,20 @@ impl Env {
node_id: NodeId, node_id: NodeId,
ttid: TenantTimelineId, ttid: TenantTimelineId,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let conf = self.make_conf(node_id); let conf = Arc::new(self.make_conf(node_id));
let timeline_dir = get_timeline_dir(&conf, &ttid); let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?; let remote_path = remote_timeline_path(&ttid)?;
let safekeeper = self.make_safekeeper(node_id, ttid).await?; let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper)); let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state); let timeline = Timeline::new(
ttid,
&timeline_dir,
&remote_path,
shared_state,
conf.clone(),
);
timeline.bootstrap( timeline.bootstrap(
&mut timeline.write_shared_state().await, &mut timeline.write_shared_state().await,
&conf, &conf,

View File

@@ -338,7 +338,7 @@ async fn main() -> anyhow::Result<()> {
} }
}; };
let conf = SafeKeeperConf { let conf = Arc::new(SafeKeeperConf {
workdir, workdir,
my_id: id, my_id: id,
listen_pg_addr: args.listen_pg, listen_pg_addr: args.listen_pg,
@@ -368,7 +368,7 @@ async fn main() -> anyhow::Result<()> {
control_file_save_interval: args.control_file_save_interval, control_file_save_interval: args.control_file_save_interval,
partial_backup_concurrency: args.partial_backup_concurrency, partial_backup_concurrency: args.partial_backup_concurrency,
eviction_min_resident: args.eviction_min_resident, eviction_min_resident: args.eviction_min_resident,
}; });
// initialize sentry if SENTRY_DSN is provided // initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry( let _sentry_guard = init_sentry(
@@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> {
/// complete, e.g. panicked, inner is error produced by task itself. /// complete, e.g. panicked, inner is error produced by task itself.
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>; type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
// fsync the datadir to make sure we have a consistent state on disk. // fsync the datadir to make sure we have a consistent state on disk.
if !conf.no_sync { if !conf.no_sync {
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?; let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
@@ -428,9 +428,11 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
e e
})?; })?;
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
// Register metrics collector for active timelines. It's important to do this // Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset. // after daemonizing, otherwise process collector will be upset.
let timeline_collector = safekeeper::metrics::TimelineCollector::new(); let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
metrics::register_internal(Box::new(timeline_collector))?; metrics::register_internal(Box::new(timeline_collector))?;
wal_backup::init_remote_storage(&conf).await; wal_backup::init_remote_storage(&conf).await;
@@ -447,9 +449,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.then(|| Handle::try_current().expect("no runtime in main")); .then(|| Handle::try_current().expect("no runtime in main"));
// Load all timelines from disk to memory. // Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone()).await?; global_timelines.init().await?;
let conf_ = conf.clone();
// Run everything in current thread rt, if asked. // Run everything in current thread rt, if asked.
if conf.current_thread_runtime { if conf.current_thread_runtime {
info!("running in current thread runtime"); info!("running in current thread runtime");
@@ -459,14 +460,16 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.as_ref() .as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main( .spawn(wal_service::task_main(
conf_, conf.clone(),
pg_listener, pg_listener,
Scope::SafekeeperData, Scope::SafekeeperData,
global_timelines.clone(),
)) ))
// wrap with task name for error reporting // wrap with task name for error reporting
.map(|res| ("WAL service main".to_owned(), res)); .map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle)); tasks_handles.push(Box::pin(wal_service_handle));
let global_timelines_ = global_timelines.clone();
let timeline_housekeeping_handle = current_thread_rt let timeline_housekeeping_handle = current_thread_rt
.as_ref() .as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
@@ -474,40 +477,45 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24); const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
loop { loop {
tokio::time::sleep(TOMBSTONE_TTL).await; tokio::time::sleep(TOMBSTONE_TTL).await;
GlobalTimelines::housekeeping(&TOMBSTONE_TTL); global_timelines_.housekeeping(&TOMBSTONE_TTL);
} }
}) })
.map(|res| ("Timeline map housekeeping".to_owned(), res)); .map(|res| ("Timeline map housekeeping".to_owned(), res));
tasks_handles.push(Box::pin(timeline_housekeeping_handle)); tasks_handles.push(Box::pin(timeline_housekeeping_handle));
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only { if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt let wal_service_handle = current_thread_rt
.as_ref() .as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main( .spawn(wal_service::task_main(
conf_, conf.clone(),
pg_listener_tenant_only, pg_listener_tenant_only,
Scope::Tenant, Scope::Tenant,
global_timelines.clone(),
)) ))
// wrap with task name for error reporting // wrap with task name for error reporting
.map(|res| ("WAL service tenant only main".to_owned(), res)); .map(|res| ("WAL service tenant only main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle)); tasks_handles.push(Box::pin(wal_service_handle));
} }
let conf_ = conf.clone();
let http_handle = current_thread_rt let http_handle = current_thread_rt
.as_ref() .as_ref()
.unwrap_or_else(|| HTTP_RUNTIME.handle()) .unwrap_or_else(|| HTTP_RUNTIME.handle())
.spawn(http::task_main(conf_, http_listener)) .spawn(http::task_main(
conf.clone(),
http_listener,
global_timelines.clone(),
))
.map(|res| ("HTTP service main".to_owned(), res)); .map(|res| ("HTTP service main".to_owned(), res));
tasks_handles.push(Box::pin(http_handle)); tasks_handles.push(Box::pin(http_handle));
let conf_ = conf.clone();
let broker_task_handle = current_thread_rt let broker_task_handle = current_thread_rt
.as_ref() .as_ref()
.unwrap_or_else(|| BROKER_RUNTIME.handle()) .unwrap_or_else(|| BROKER_RUNTIME.handle())
.spawn(broker::task_main(conf_).instrument(info_span!("broker"))) .spawn(
broker::task_main(conf.clone(), global_timelines.clone())
.instrument(info_span!("broker")),
)
.map(|res| ("broker main".to_owned(), res)); .map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle)); tasks_handles.push(Box::pin(broker_task_handle));

View File

@@ -39,14 +39,17 @@ const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000;
/// Push once in a while data about all active timelines to the broker. /// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { async fn push_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
if conf.disable_periodic_broker_push { if conf.disable_periodic_broker_push {
info!("broker push_loop is disabled, doing nothing..."); info!("broker push_loop is disabled, doing nothing...");
futures::future::pending::<()>().await; // sleep forever futures::future::pending::<()>().await; // sleep forever
return Ok(()); return Ok(());
} }
let active_timelines_set = GlobalTimelines::get_global_broker_active_set(); let active_timelines_set = global_timelines.get_global_broker_active_set();
let mut client = let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
@@ -87,8 +90,13 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
/// Subscribe and fetch all the interesting data from the broker. /// Subscribe and fetch all the interesting data from the broker.
#[instrument(name = "broker_pull", skip_all)] #[instrument(name = "broker_pull", skip_all)]
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> { async fn pull_loop(
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?; conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>,
) -> Result<()> {
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
// TODO: subscribe only to local timelines instead of all // TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest { let request = SubscribeSafekeeperInfoRequest {
@@ -113,7 +121,7 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()>
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?; .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?; let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = GlobalTimelines::get(ttid) { if let Ok(tli) = global_timelines.get(ttid) {
// Note that we also receive *our own* info. That's // Note that we also receive *our own* info. That's
// important, as it is used as an indication of live // important, as it is used as an indication of live
// connection to the broker. // connection to the broker.
@@ -135,7 +143,11 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()>
/// Process incoming discover requests. This is done in a separate task to avoid /// Process incoming discover requests. This is done in a separate task to avoid
/// interfering with the normal pull/push loops. /// interfering with the normal pull/push loops.
async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> { async fn discover_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>,
) -> Result<()> {
let mut client = let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
@@ -171,7 +183,7 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?; .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?; let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = GlobalTimelines::get(ttid) { if let Ok(tli) = global_timelines.get(ttid) {
// we received a discovery request for a timeline we know about // we received a discovery request for a timeline we know about
discover_counter.inc(); discover_counter.inc();
@@ -210,7 +222,10 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<
bail!("end of stream"); bail!("end of stream");
} }
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { pub async fn task_main(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
info!("started, broker endpoint {:?}", conf.broker_endpoint); info!("started, broker endpoint {:?}", conf.broker_endpoint);
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC)); let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
@@ -261,13 +276,13 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
}, },
_ = ticker.tick() => { _ = ticker.tick() => {
if push_handle.is_none() { if push_handle.is_none() {
push_handle = Some(tokio::spawn(push_loop(conf.clone()))); push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
} }
if pull_handle.is_none() { if pull_handle.is_none() {
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone()))); pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
} }
if discover_handle.is_none() { if discover_handle.is_none() {
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone()))); discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
} }
}, },
_ = &mut stats_task => {} _ = &mut stats_task => {}

View File

@@ -1,9 +1,7 @@
use std::sync::Arc;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use std::sync::Arc;
use tokio::{ use tokio::{
fs::OpenOptions, fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt}, io::{AsyncSeekExt, AsyncWriteExt},
@@ -14,7 +12,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{ use crate::{
control_file::FileStorage, control_file::FileStorage,
state::TimelinePersistentState, state::TimelinePersistentState,
timeline::{Timeline, TimelineError, WalResidentTimeline}, timeline::{TimelineError, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline}, timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup::copy_s3_segments, wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader}, wal_storage::{wal_file_paths, WalReader},
@@ -25,16 +23,19 @@ use crate::{
const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64; const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
pub struct Request { pub struct Request {
pub source: Arc<Timeline>, pub source_ttid: TenantTimelineId,
pub until_lsn: Lsn, pub until_lsn: Lsn,
pub destination_ttid: TenantTimelineId, pub destination_ttid: TenantTimelineId,
} }
pub async fn handle_request(request: Request) -> Result<()> { pub async fn handle_request(
request: Request,
global_timelines: Arc<GlobalTimelines>,
) -> Result<()> {
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :( // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state // if LSN will point to the middle of a WAL record, timeline will be in "broken" state
match GlobalTimelines::get(request.destination_ttid) { match global_timelines.get(request.destination_ttid) {
// timeline already exists. would be good to check that this timeline is the copy // timeline already exists. would be good to check that this timeline is the copy
// of the source timeline, but it isn't obvious how to do that // of the source timeline, but it isn't obvious how to do that
Ok(_) => return Ok(()), Ok(_) => return Ok(()),
@@ -46,9 +47,10 @@ pub async fn handle_request(request: Request) -> Result<()> {
} }
} }
let source_tli = request.source.wal_residence_guard().await?; let source = global_timelines.get(request.source_ttid)?;
let source_tli = source.wal_residence_guard().await?;
let conf = &GlobalTimelines::get_global_config(); let conf = &global_timelines.get_global_config();
let ttid = request.destination_ttid; let ttid = request.destination_ttid;
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
@@ -127,7 +129,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
copy_s3_segments( copy_s3_segments(
wal_seg_size, wal_seg_size,
&request.source.ttid, &request.source_ttid,
&request.destination_ttid, &request.destination_ttid,
first_segment, first_segment,
first_ondisk_segment, first_ondisk_segment,
@@ -158,7 +160,9 @@ pub async fn handle_request(request: Request) -> Result<()> {
// now we have a ready timeline in a temp directory // now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?; validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?; global_timelines
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
.await?;
Ok(()) Ok(())
} }

View File

@@ -207,23 +207,23 @@ pub struct FileInfo {
} }
/// Build debug dump response, using the provided [`Args`] filters. /// Build debug dump response, using the provided [`Args`] filters.
pub async fn build(args: Args) -> Result<Response> { pub async fn build(args: Args, global_timelines: Arc<GlobalTimelines>) -> Result<Response> {
let start_time = Utc::now(); let start_time = Utc::now();
let timelines_count = GlobalTimelines::timelines_count(); let timelines_count = global_timelines.timelines_count();
let config = GlobalTimelines::get_global_config(); let config = global_timelines.get_global_config();
let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() { let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
// If both tenant_id and timeline_id are specified, we can just get the // If both tenant_id and timeline_id are specified, we can just get the
// timeline directly, without taking a snapshot of the whole list. // timeline directly, without taking a snapshot of the whole list.
let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap()); let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
if let Ok(tli) = GlobalTimelines::get(ttid) { if let Ok(tli) = global_timelines.get(ttid) {
vec![tli] vec![tli]
} else { } else {
vec![] vec![]
} }
} else { } else {
// Otherwise, take a snapshot of the whole list. // Otherwise, take a snapshot of the whole list.
GlobalTimelines::get_all() global_timelines.get_all()
}; };
let mut timelines = Vec::new(); let mut timelines = Vec::new();
@@ -344,12 +344,12 @@ fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
/// Converts SafeKeeperConf to Config, filtering out the fields that are not /// Converts SafeKeeperConf to Config, filtering out the fields that are not
/// supposed to be exposed. /// supposed to be exposed.
fn build_config(config: SafeKeeperConf) -> Config { fn build_config(config: Arc<SafeKeeperConf>) -> Config {
Config { Config {
id: config.my_id, id: config.my_id,
workdir: config.workdir.into(), workdir: config.workdir.clone().into(),
listen_pg_addr: config.listen_pg_addr, listen_pg_addr: config.listen_pg_addr.clone(),
listen_http_addr: config.listen_http_addr, listen_http_addr: config.listen_http_addr.clone(),
no_sync: config.no_sync, no_sync: config.no_sync,
max_offloader_lag_bytes: config.max_offloader_lag_bytes, max_offloader_lag_bytes: config.max_offloader_lag_bytes,
wal_backup_enabled: config.wal_backup_enabled, wal_backup_enabled: config.wal_backup_enabled,

View File

@@ -33,7 +33,7 @@ use utils::{
/// Safekeeper handler of postgres commands /// Safekeeper handler of postgres commands
pub struct SafekeeperPostgresHandler { pub struct SafekeeperPostgresHandler {
pub conf: SafeKeeperConf, pub conf: Arc<SafeKeeperConf>,
/// assigned application name /// assigned application name
pub appname: Option<String>, pub appname: Option<String>,
pub tenant_id: Option<TenantId>, pub tenant_id: Option<TenantId>,
@@ -43,6 +43,7 @@ pub struct SafekeeperPostgresHandler {
pub protocol: Option<PostgresClientProtocol>, pub protocol: Option<PostgresClientProtocol>,
/// Unique connection id is logged in spans for observability. /// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId, pub conn_id: ConnectionId,
pub global_timelines: Arc<GlobalTimelines>,
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured. /// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
auth: Option<(Scope, Arc<JwtAuth>)>, auth: Option<(Scope, Arc<JwtAuth>)>,
claims: Option<Claims>, claims: Option<Claims>,
@@ -314,10 +315,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
impl SafekeeperPostgresHandler { impl SafekeeperPostgresHandler {
pub fn new( pub fn new(
conf: SafeKeeperConf, conf: Arc<SafeKeeperConf>,
conn_id: u32, conn_id: u32,
io_metrics: Option<TrafficMetrics>, io_metrics: Option<TrafficMetrics>,
auth: Option<(Scope, Arc<JwtAuth>)>, auth: Option<(Scope, Arc<JwtAuth>)>,
global_timelines: Arc<GlobalTimelines>,
) -> Self { ) -> Self {
SafekeeperPostgresHandler { SafekeeperPostgresHandler {
conf, conf,
@@ -331,6 +333,7 @@ impl SafekeeperPostgresHandler {
claims: None, claims: None,
auth, auth,
io_metrics, io_metrics,
global_timelines,
} }
} }
@@ -360,7 +363,7 @@ impl SafekeeperPostgresHandler {
pgb: &mut PostgresBackend<IO>, pgb: &mut PostgresBackend<IO>,
) -> Result<(), QueryError> { ) -> Result<(), QueryError> {
// Get timeline, handling "not found" error // Get timeline, handling "not found" error
let tli = match GlobalTimelines::get(self.ttid) { let tli = match self.global_timelines.get(self.ttid) {
Ok(tli) => Ok(Some(tli)), Ok(tli) => Ok(Some(tli)),
Err(TimelineError::NotFound(_)) => Ok(None), Err(TimelineError::NotFound(_)) => Ok(None),
Err(e) => Err(QueryError::Other(e.into())), Err(e) => Err(QueryError::Other(e.into())),
@@ -394,7 +397,10 @@ impl SafekeeperPostgresHandler {
&mut self, &mut self,
pgb: &mut PostgresBackend<IO>, pgb: &mut PostgresBackend<IO>,
) -> Result<(), QueryError> { ) -> Result<(), QueryError> {
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?; let tli = self
.global_timelines
.get(self.ttid)
.map_err(|e| QueryError::Other(e.into()))?;
let lsn = if self.is_walproposer_recovery() { let lsn = if self.is_walproposer_recovery() {
// walproposer should get all local WAL until flush_lsn // walproposer should get all local WAL until flush_lsn

View File

@@ -3,14 +3,16 @@ pub mod routes;
pub use routes::make_router; pub use routes::make_router;
pub use safekeeper_api::models; pub use safekeeper_api::models;
use std::sync::Arc;
use crate::SafeKeeperConf; use crate::{GlobalTimelines, SafeKeeperConf};
pub async fn task_main( pub async fn task_main(
conf: SafeKeeperConf, conf: Arc<SafeKeeperConf>,
http_listener: std::net::TcpListener, http_listener: std::net::TcpListener,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let router = make_router(conf) let router = make_router(conf, global_timelines)
.build() .build()
.map_err(|err| anyhow::anyhow!(err))?; .map_err(|err| anyhow::anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap(); let service = utils::http::RouterService::new(router).unwrap();

View File

@@ -66,6 +66,13 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref() .as_ref()
} }
fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
request
.data::<Arc<GlobalTimelines>>()
.expect("unknown state type")
.clone()
}
/// Same as TermLsn, but serializes LSN using display serializer /// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
@@ -123,9 +130,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(tenant_id))?; check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?; ensure_no_body(&mut request).await?;
let global_timelines = get_global_timelines(&request);
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons; // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it // Using an `InternalServerError` should be fixed when the types support it
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local) let delete_info = global_timelines
.delete_force_all_for_tenant(&tenant_id, only_local)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
json_response( json_response(
@@ -156,7 +165,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
.commit_lsn .commit_lsn
.segment_lsn(server_info.wal_seg_size as usize) .segment_lsn(server_info.wal_seg_size as usize)
}); });
GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn) let global_timelines = get_global_timelines(&request);
global_timelines
.create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
@@ -167,7 +178,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
/// Note: it is possible to do the same with debug_dump. /// Note: it is possible to do the same with debug_dump.
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> { async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?; check_permission(&request, None)?;
let res: Vec<TenantTimelineId> = GlobalTimelines::get_all() let global_timelines = get_global_timelines(&request);
let res: Vec<TenantTimelineId> = global_timelines
.get_all()
.iter() .iter()
.map(|tli| tli.ttid) .map(|tli| tli.ttid)
.collect(); .collect();
@@ -182,7 +195,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
); );
check_permission(&request, Some(ttid.tenant_id))?; check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let (inmem, state) = tli.get_state().await; let (inmem, state) = tli.get_state().await;
let flush_lsn = tli.get_flush_lsn().await; let flush_lsn = tli.get_flush_lsn().await;
@@ -233,9 +247,11 @@ async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(ttid.tenant_id))?; check_permission(&request, Some(ttid.tenant_id))?;
ensure_no_body(&mut request).await?; ensure_no_body(&mut request).await?;
let global_timelines = get_global_timelines(&request);
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to. // error handling here when we're able to.
let resp = GlobalTimelines::delete(&ttid, only_local) let resp = global_timelines
.delete(&ttid, only_local)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp) json_response(StatusCode::OK, resp)
@@ -247,8 +263,9 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let data: pull_timeline::Request = json_request(&mut request).await?; let data: pull_timeline::Request = json_request(&mut request).await?;
let conf = get_conf(&request); let conf = get_conf(&request);
let global_timelines = get_global_timelines(&request);
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone()) let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone(), global_timelines)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp) json_response(StatusCode::OK, resp)
@@ -263,7 +280,8 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
); );
check_permission(&request, Some(ttid.tenant_id))?; check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
// To stream the body use wrap_stream which wants Stream of Result<Bytes>, // To stream the body use wrap_stream which wants Stream of Result<Bytes>,
// so create the chan and write to it in another task. // so create the chan and write to it in another task.
@@ -293,19 +311,19 @@ async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Bo
check_permission(&request, None)?; check_permission(&request, None)?;
let request_data: TimelineCopyRequest = json_request(&mut request).await?; let request_data: TimelineCopyRequest = json_request(&mut request).await?;
let ttid = TenantTimelineId::new( let source_ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?, parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "source_timeline_id")?, parse_request_param(&request, "source_timeline_id")?,
); );
let source = GlobalTimelines::get(ttid)?; let global_timelines = get_global_timelines(&request);
copy_timeline::handle_request(copy_timeline::Request{ copy_timeline::handle_request(copy_timeline::Request{
source, source_ttid,
until_lsn: request_data.until_lsn, until_lsn: request_data.until_lsn,
destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id), destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
}) }, global_timelines)
.instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn)) .instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
@@ -322,7 +340,8 @@ async fn patch_control_file_handler(
parse_request_param(&request, "timeline_id")?, parse_request_param(&request, "timeline_id")?,
); );
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let patch_request: patch_control_file::Request = json_request(&mut request).await?; let patch_request: patch_control_file::Request = json_request(&mut request).await?;
let response = patch_control_file::handle_request(tli, patch_request) let response = patch_control_file::handle_request(tli, patch_request)
@@ -341,7 +360,8 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
parse_request_param(&request, "timeline_id")?, parse_request_param(&request, "timeline_id")?,
); );
let tli = GlobalTimelines::get(ttid)?; let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid)?;
tli.write_shared_state() tli.write_shared_state()
.await .await
.sk .sk
@@ -359,6 +379,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
); );
check_permission(&request, Some(ttid.tenant_id))?; check_permission(&request, Some(ttid.tenant_id))?;
let global_timelines = get_global_timelines(&request);
let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?; let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?; let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
@@ -371,7 +392,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
)))?, )))?,
}; };
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = tli let tli = tli
.wal_residence_guard() .wal_residence_guard()
.await .await
@@ -393,7 +414,8 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
); );
check_permission(&request, Some(ttid.tenant_id))?; check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let response = tli let response = tli
.backup_partial_reset() .backup_partial_reset()
@@ -415,7 +437,8 @@ async fn timeline_term_bump_handler(
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?; let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let response = tli let response = tli
.term_bump(request_data.term) .term_bump(request_data.term)
.await .await
@@ -452,7 +475,8 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
standby_horizon: sk_info.standby_horizon.0, standby_horizon: sk_info.standby_horizon.0,
}; };
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
tli.record_safekeeper_info(proto_sk_info) tli.record_safekeeper_info(proto_sk_info)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
@@ -506,6 +530,8 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let dump_term_history = dump_term_history.unwrap_or(true); let dump_term_history = dump_term_history.unwrap_or(true);
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all); let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
let global_timelines = get_global_timelines(&request);
let args = debug_dump::Args { let args = debug_dump::Args {
dump_all, dump_all,
dump_control_file, dump_control_file,
@@ -517,7 +543,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
timeline_id, timeline_id,
}; };
let resp = debug_dump::build(args) let resp = debug_dump::build(args, global_timelines)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
@@ -570,7 +596,10 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
} }
/// Safekeeper http router. /// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> { pub fn make_router(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> RouterBuilder<hyper::Body, ApiError> {
let mut router = endpoint::make_router(); let mut router = endpoint::make_router();
if conf.http_auth.is_some() { if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| { router = router.middleware(auth_middleware(|request| {
@@ -592,7 +621,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
// located nearby (/safekeeper/src/http/openapi_spec.yaml). // located nearby (/safekeeper/src/http/openapi_spec.yaml).
let auth = conf.http_auth.clone(); let auth = conf.http_auth.clone();
router router
.data(Arc::new(conf)) .data(conf)
.data(global_timelines)
.data(auth) .data(auth)
.get("/metrics", |r| request_span(r, prometheus_metrics_handler)) .get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler))

View File

@@ -11,7 +11,6 @@ use postgres_backend::QueryError;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tracing::*; use tracing::*;
use utils::id::TenantTimelineId;
use crate::handler::SafekeeperPostgresHandler; use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo}; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
@@ -21,7 +20,6 @@ use crate::safekeeper::{
use crate::safekeeper::{Term, TermHistory, TermLsn}; use crate::safekeeper::{Term, TermHistory, TermLsn};
use crate::state::TimelinePersistentState; use crate::state::TimelinePersistentState;
use crate::timeline::WalResidentTimeline; use crate::timeline::WalResidentTimeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend; use postgres_backend::PostgresBackend;
use postgres_ffi::encode_logical_message; use postgres_ffi::encode_logical_message;
use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_ffi::WAL_SEGMENT_SIZE;
@@ -70,7 +68,7 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
info!("JSON_CTRL request: {append_request:?}"); info!("JSON_CTRL request: {append_request:?}");
// need to init safekeeper state before AppendRequest // need to init safekeeper state before AppendRequest
let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?; let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
// if send_proposer_elected is true, we need to update local history // if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected { if append_request.send_proposer_elected {
@@ -99,20 +97,22 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
/// Prepare safekeeper to process append requests without crashes, /// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size. /// by sending ProposerGreeting with default server.wal_seg_size.
async fn prepare_safekeeper( async fn prepare_safekeeper(
ttid: TenantTimelineId, spg: &SafekeeperPostgresHandler,
pg_version: u32, pg_version: u32,
) -> anyhow::Result<WalResidentTimeline> { ) -> anyhow::Result<WalResidentTimeline> {
let tli = GlobalTimelines::create( let tli = spg
ttid, .global_timelines
ServerInfo { .create(
pg_version, spg.ttid,
wal_seg_size: WAL_SEGMENT_SIZE as u32, ServerInfo {
system_id: 0, pg_version,
}, wal_seg_size: WAL_SEGMENT_SIZE as u32,
Lsn::INVALID, system_id: 0,
Lsn::INVALID, },
) Lsn::INVALID,
.await?; Lsn::INVALID,
)
.await?;
tli.wal_residence_guard().await tli.wal_residence_guard().await
} }

View File

@@ -455,6 +455,7 @@ pub struct FullTimelineInfo {
/// Collects metrics for all active timelines. /// Collects metrics for all active timelines.
pub struct TimelineCollector { pub struct TimelineCollector {
global_timelines: Arc<GlobalTimelines>,
descs: Vec<Desc>, descs: Vec<Desc>,
commit_lsn: GenericGaugeVec<AtomicU64>, commit_lsn: GenericGaugeVec<AtomicU64>,
backup_lsn: GenericGaugeVec<AtomicU64>, backup_lsn: GenericGaugeVec<AtomicU64>,
@@ -478,14 +479,8 @@ pub struct TimelineCollector {
active_timelines_count: IntGauge, active_timelines_count: IntGauge,
} }
impl Default for TimelineCollector {
fn default() -> Self {
Self::new()
}
}
impl TimelineCollector { impl TimelineCollector {
pub fn new() -> TimelineCollector { pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
let mut descs = Vec::new(); let mut descs = Vec::new();
let commit_lsn = GenericGaugeVec::new( let commit_lsn = GenericGaugeVec::new(
@@ -676,6 +671,7 @@ impl TimelineCollector {
descs.extend(active_timelines_count.desc().into_iter().cloned()); descs.extend(active_timelines_count.desc().into_iter().cloned());
TimelineCollector { TimelineCollector {
global_timelines,
descs, descs,
commit_lsn, commit_lsn,
backup_lsn, backup_lsn,
@@ -728,17 +724,18 @@ impl Collector for TimelineCollector {
self.written_wal_seconds.reset(); self.written_wal_seconds.reset();
self.flushed_wal_seconds.reset(); self.flushed_wal_seconds.reset();
let timelines_count = GlobalTimelines::get_all().len(); let timelines_count = self.global_timelines.get_all().len();
let mut active_timelines_count = 0; let mut active_timelines_count = 0;
// Prometheus Collector is sync, and data is stored under async lock. To // Prometheus Collector is sync, and data is stored under async lock. To
// bridge the gap with a crutch, collect data in spawned thread with // bridge the gap with a crutch, collect data in spawned thread with
// local tokio runtime. // local tokio runtime.
let global_timelines = self.global_timelines.clone();
let infos = std::thread::spawn(|| { let infos = std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.build() .build()
.expect("failed to create rt"); .expect("failed to create rt");
rt.block_on(collect_timeline_metrics()) rt.block_on(collect_timeline_metrics(global_timelines))
}) })
.join() .join()
.expect("collect_timeline_metrics thread panicked"); .expect("collect_timeline_metrics thread panicked");
@@ -857,9 +854,9 @@ impl Collector for TimelineCollector {
} }
} }
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> { async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
let mut res = vec![]; let mut res = vec![];
let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all(); let active_timelines = global_timelines.get_global_broker_active_set().get_all();
for tli in active_timelines { for tli in active_timelines {
if let Some(info) = tli.info_for_metrics().await { if let Some(info) = tli.info_for_metrics().await {

View File

@@ -409,8 +409,9 @@ pub struct DebugDumpResponse {
pub async fn handle_request( pub async fn handle_request(
request: Request, request: Request,
sk_auth_token: Option<SecretString>, sk_auth_token: Option<SecretString>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<Response> { ) -> Result<Response> {
let existing_tli = GlobalTimelines::get(TenantTimelineId::new( let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id, request.tenant_id,
request.timeline_id, request.timeline_id,
)); ));
@@ -453,13 +454,14 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id); assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id); assert!(status.timeline_id == request.timeline_id);
pull_timeline(status, safekeeper_host, sk_auth_token).await pull_timeline(status, safekeeper_host, sk_auth_token, global_timelines).await
} }
async fn pull_timeline( async fn pull_timeline(
status: TimelineStatus, status: TimelineStatus,
host: String, host: String,
sk_auth_token: Option<SecretString>, sk_auth_token: Option<SecretString>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<Response> { ) -> Result<Response> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!( info!(
@@ -472,7 +474,7 @@ async fn pull_timeline(
status.acceptor_state.epoch status.acceptor_state.epoch
); );
let conf = &GlobalTimelines::get_global_config(); let conf = &global_timelines.get_global_config();
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
@@ -531,7 +533,9 @@ async fn pull_timeline(
assert!(status.commit_lsn <= status.flush_lsn); assert!(status.commit_lsn <= status.flush_lsn);
// Finally, load the timeline. // Finally, load the timeline.
let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?; let _tli = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, false)
.await?;
Ok(Response { Ok(Response {
safekeeper_host: host, safekeeper_host: host,

View File

@@ -267,6 +267,7 @@ impl SafekeeperPostgresHandler {
pgb_reader: &mut pgb_reader, pgb_reader: &mut pgb_reader,
peer_addr, peer_addr,
acceptor_handle: &mut acceptor_handle, acceptor_handle: &mut acceptor_handle,
global_timelines: self.global_timelines.clone(),
}; };
// Read first message and create timeline if needed. // Read first message and create timeline if needed.
@@ -331,6 +332,7 @@ struct NetworkReader<'a, IO> {
// WalAcceptor is spawned when we learn server info from walproposer and // WalAcceptor is spawned when we learn server info from walproposer and
// create timeline; handle is put here. // create timeline; handle is put here.
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>, acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
global_timelines: Arc<GlobalTimelines>,
} }
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
@@ -350,10 +352,11 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
system_id: greeting.system_id, system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size, wal_seg_size: greeting.wal_seg_size,
}; };
let tli = let tli = self
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID) .global_timelines
.await .create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
.context("create timeline")?; .await
.context("create timeline")?;
tli.wal_residence_guard().await? tli.wal_residence_guard().await?
} }
_ => { _ => {

View File

@@ -10,7 +10,6 @@ use crate::timeline::WalResidentTimeline;
use crate::wal_reader_stream::WalReaderStreamBuilder; use crate::wal_reader_stream::WalReaderStreamBuilder;
use crate::wal_service::ConnectionId; use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader; use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use anyhow::{bail, Context as AnyhowContext}; use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes; use bytes::Bytes;
use futures::future::Either; use futures::future::Either;
@@ -400,7 +399,10 @@ impl SafekeeperPostgresHandler {
start_pos: Lsn, start_pos: Lsn,
term: Option<Term>, term: Option<Term>,
) -> Result<(), QueryError> { ) -> Result<(), QueryError> {
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?; let tli = self
.global_timelines
.get(self.ttid)
.map_err(|e| QueryError::Other(e.into()))?;
let residence_guard = tli.wal_residence_guard().await?; let residence_guard = tli.wal_residence_guard().await?;
if let Err(end) = self if let Err(end) = self

View File

@@ -44,8 +44,8 @@ use crate::wal_backup_partial::PartialRemoteSegment;
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS}; use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::SafeKeeperConf;
use crate::{debug_dump, timeline_manager, wal_storage}; use crate::{debug_dump, timeline_manager, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};
/// Things safekeeper should know about timeline state on peers. /// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -467,6 +467,7 @@ pub struct Timeline {
walreceivers: Arc<WalReceivers>, walreceivers: Arc<WalReceivers>,
timeline_dir: Utf8PathBuf, timeline_dir: Utf8PathBuf,
manager_ctl: ManagerCtl, manager_ctl: ManagerCtl,
conf: Arc<SafeKeeperConf>,
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
/// this gate, you must respect [`Timeline::cancel`] /// this gate, you must respect [`Timeline::cancel`]
@@ -489,6 +490,7 @@ impl Timeline {
timeline_dir: &Utf8Path, timeline_dir: &Utf8Path,
remote_path: &RemotePath, remote_path: &RemotePath,
shared_state: SharedState, shared_state: SharedState,
conf: Arc<SafeKeeperConf>,
) -> Arc<Self> { ) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state().commit_lsn); watch::channel(shared_state.sk.state().commit_lsn);
@@ -516,6 +518,7 @@ impl Timeline {
gate: Default::default(), gate: Default::default(),
cancel: CancellationToken::default(), cancel: CancellationToken::default(),
manager_ctl: ManagerCtl::new(), manager_ctl: ManagerCtl::new(),
conf,
broker_active: AtomicBool::new(false), broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false), wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0), last_removed_segno: AtomicU64::new(0),
@@ -524,11 +527,14 @@ impl Timeline {
} }
/// Load existing timeline from disk. /// Load existing timeline from disk.
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> { pub fn load_timeline(
conf: Arc<SafeKeeperConf>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(conf, &ttid)?; let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
let timeline_dir = get_timeline_dir(conf, &ttid); let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
let remote_path = remote_timeline_path(&ttid)?; let remote_path = remote_timeline_path(&ttid)?;
Ok(Timeline::new( Ok(Timeline::new(
@@ -536,6 +542,7 @@ impl Timeline {
&timeline_dir, &timeline_dir,
&remote_path, &remote_path,
shared_state, shared_state,
conf,
)) ))
} }
@@ -604,8 +611,7 @@ impl Timeline {
// it is cancelled, so WAL storage won't be opened again. // it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store(); shared_state.sk.close_wal_store();
let conf = GlobalTimelines::get_global_config(); if !only_local && self.conf.is_wal_backup_enabled() {
if !only_local && conf.is_wal_backup_enabled() {
// Note: we concurrently delete remote storage data from multiple // Note: we concurrently delete remote storage data from multiple
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
// do some retries anyway. // do some retries anyway.
@@ -951,7 +957,7 @@ impl WalResidentTimeline {
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> { pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
let (_, persisted_state) = self.get_state().await; let (_, persisted_state) = self.get_state().await;
let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled(); let enable_remote_read = self.conf.is_wal_backup_enabled();
WalReader::new( WalReader::new(
&self.ttid, &self.ttid,
@@ -1061,7 +1067,6 @@ impl ManagerTimeline {
/// Try to switch state Offloaded->Present. /// Try to switch state Offloaded->Present.
pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> { pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
let conf = GlobalTimelines::get_global_config();
let mut shared = self.write_shared_state().await; let mut shared = self.write_shared_state().await;
// trying to restore WAL storage // trying to restore WAL storage
@@ -1069,7 +1074,7 @@ impl ManagerTimeline {
&self.ttid, &self.ttid,
&self.timeline_dir, &self.timeline_dir,
shared.sk.state(), shared.sk.state(),
conf.no_sync, self.conf.no_sync,
)?; )?;
// updating control file // updating control file
@@ -1096,7 +1101,7 @@ impl ManagerTimeline {
// now we can switch shared.sk to Present, shouldn't fail // now we can switch shared.sk to Present, shouldn't fail
let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty); let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
let cfile_state = prev_sk.take_state(); let cfile_state = prev_sk.take_state();
shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?); shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
Ok(()) Ok(())
} }

View File

@@ -13,7 +13,6 @@ use crate::{control_file, wal_storage, SafeKeeperConf};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir; use camino_tempfile::Utf8TempDir;
use once_cell::sync::Lazy;
use serde::Serialize; use serde::Serialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr; use std::str::FromStr;
@@ -42,23 +41,16 @@ struct GlobalTimelinesState {
// this map is dropped on restart. // this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>, tombstones: HashMap<TenantTimelineId, Instant>,
conf: Option<SafeKeeperConf>, conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>, broker_active_set: Arc<TimelinesSet>,
global_rate_limiter: RateLimiter, global_rate_limiter: RateLimiter,
} }
impl GlobalTimelinesState { impl GlobalTimelinesState {
/// Get configuration, which must be set once during init.
fn get_conf(&self) -> &SafeKeeperConf {
self.conf
.as_ref()
.expect("GlobalTimelinesState conf is not initialized")
}
/// Get dependencies for a timeline constructor. /// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) { fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
( (
self.get_conf().clone(), self.conf.clone(),
self.broker_active_set.clone(), self.broker_active_set.clone(),
self.global_rate_limiter.clone(), self.global_rate_limiter.clone(),
) )
@@ -82,35 +74,39 @@ impl GlobalTimelinesState {
} }
} }
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| { /// A struct used to manage access to the global timelines map.
Mutex::new(GlobalTimelinesState { pub struct GlobalTimelines {
timelines: HashMap::new(), state: Mutex<GlobalTimelinesState>,
tombstones: HashMap::new(), }
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
})
});
/// A zero-sized struct used to manage access to the global timelines map.
pub struct GlobalTimelines;
impl GlobalTimelines { impl GlobalTimelines {
/// Create a new instance of the global timelines map.
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
Self {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
}),
}
}
/// Inject dependencies needed for the timeline constructors and load all timelines to memory. /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub async fn init(conf: SafeKeeperConf) -> Result<()> { pub async fn init(&self) -> Result<()> {
// clippy isn't smart enough to understand that drop(state) releases the // clippy isn't smart enough to understand that drop(state) releases the
// lock, so use explicit block // lock, so use explicit block
let tenants_dir = { let tenants_dir = {
let mut state = TIMELINES_STATE.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.global_rate_limiter = RateLimiter::new( state.global_rate_limiter = RateLimiter::new(
conf.partial_backup_concurrency, state.conf.partial_backup_concurrency,
DEFAULT_EVICTION_CONCURRENCY, DEFAULT_EVICTION_CONCURRENCY,
); );
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories // Iterate through all directories and load tenants for all directories
// named as a valid tenant_id. // named as a valid tenant_id.
state.get_conf().workdir.clone() state.conf.workdir.clone()
}; };
let mut tenant_count = 0; let mut tenant_count = 0;
for tenants_dir_entry in std::fs::read_dir(&tenants_dir) for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
@@ -122,7 +118,7 @@ impl GlobalTimelines {
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or("")) TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
{ {
tenant_count += 1; tenant_count += 1;
GlobalTimelines::load_tenant_timelines(tenant_id).await?; self.load_tenant_timelines(tenant_id).await?;
} }
} }
Err(e) => error!( Err(e) => error!(
@@ -135,7 +131,7 @@ impl GlobalTimelines {
info!( info!(
"found {} tenants directories, successfully loaded {} timelines", "found {} tenants directories, successfully loaded {} timelines",
tenant_count, tenant_count,
TIMELINES_STATE.lock().unwrap().timelines.len() self.state.lock().unwrap().timelines.len()
); );
Ok(()) Ok(())
} }
@@ -143,13 +139,13 @@ impl GlobalTimelines {
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any. /// errors if any.
/// ///
/// It is async, but TIMELINES_STATE lock is sync and there is no important /// It is async, but self.state lock is sync and there is no important
/// reason to make it async (it is always held for a short while), so we /// reason to make it async (it is always held for a short while), so we
/// just lock and unlock it for each timeline -- this function is called /// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine. /// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> { async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = { let (conf, broker_active_set, partial_backup_rate_limiter) = {
let state = TIMELINES_STATE.lock().unwrap(); let state = self.state.lock().unwrap();
state.get_dependencies() state.get_dependencies()
}; };
@@ -163,10 +159,10 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{ {
let ttid = TenantTimelineId::new(tenant_id, timeline_id); let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(&conf, ttid) { match Timeline::load_timeline(conf.clone(), ttid) {
Ok(tli) => { Ok(tli) => {
let mut shared_state = tli.write_shared_state().await; let mut shared_state = tli.write_shared_state().await;
TIMELINES_STATE self.state
.lock() .lock()
.unwrap() .unwrap()
.timelines .timelines
@@ -200,29 +196,30 @@ impl GlobalTimelines {
} }
/// Get the number of timelines in the map. /// Get the number of timelines in the map.
pub fn timelines_count() -> usize { pub fn timelines_count(&self) -> usize {
TIMELINES_STATE.lock().unwrap().timelines.len() self.state.lock().unwrap().timelines.len()
} }
/// Get the global safekeeper config. /// Get the global safekeeper config.
pub fn get_global_config() -> SafeKeeperConf { pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
TIMELINES_STATE.lock().unwrap().get_conf().clone() self.state.lock().unwrap().conf.clone()
} }
pub fn get_global_broker_active_set() -> Arc<TimelinesSet> { pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
TIMELINES_STATE.lock().unwrap().broker_active_set.clone() self.state.lock().unwrap().broker_active_set.clone()
} }
/// Create a new timeline with the given id. If the timeline already exists, returns /// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline. /// an existing timeline.
pub(crate) async fn create( pub(crate) async fn create(
&self,
ttid: TenantTimelineId, ttid: TenantTimelineId,
server_info: ServerInfo, server_info: ServerInfo,
commit_lsn: Lsn, commit_lsn: Lsn,
local_start_lsn: Lsn, local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
let (conf, _, _) = { let (conf, _, _) = {
let state = TIMELINES_STATE.lock().unwrap(); let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) { if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it. // Timeline already exists, return it.
return Ok(timeline); return Ok(timeline);
@@ -245,7 +242,7 @@ impl GlobalTimelines {
let state = let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?; TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?; control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?; let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
Ok(timeline) Ok(timeline)
} }
@@ -261,13 +258,14 @@ impl GlobalTimelines {
/// 2) move the directory and load the timeline /// 2) move the directory and load the timeline
/// 3) take lock again and insert the timeline into the global map. /// 3) take lock again and insert the timeline into the global map.
pub async fn load_temp_timeline( pub async fn load_temp_timeline(
&self,
ttid: TenantTimelineId, ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf, tmp_path: &Utf8PathBuf,
check_tombstone: bool, check_tombstone: bool,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it. // Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = { let (conf, broker_active_set, partial_backup_rate_limiter) = {
let mut state = TIMELINES_STATE.lock().unwrap(); let mut state = self.state.lock().unwrap();
match state.timelines.get(&ttid) { match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => { Some(GlobalMapTimeline::CreationInProgress) => {
bail!(TimelineError::CreationInProgress(ttid)); bail!(TimelineError::CreationInProgress(ttid));
@@ -295,10 +293,10 @@ impl GlobalTimelines {
}; };
// Do the actual move and reflect the result in the map. // Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await { match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
Ok(timeline) => { Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await; let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = TIMELINES_STATE.lock().unwrap(); let mut state = self.state.lock().unwrap();
assert!(matches!( assert!(matches!(
state.timelines.get(&ttid), state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress) Some(GlobalMapTimeline::CreationInProgress)
@@ -319,7 +317,7 @@ impl GlobalTimelines {
} }
Err(e) => { Err(e) => {
// Init failed, remove the marker from the map // Init failed, remove the marker from the map
let mut state = TIMELINES_STATE.lock().unwrap(); let mut state = self.state.lock().unwrap();
assert!(matches!( assert!(matches!(
state.timelines.get(&ttid), state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress) Some(GlobalMapTimeline::CreationInProgress)
@@ -334,10 +332,10 @@ impl GlobalTimelines {
async fn install_temp_timeline( async fn install_temp_timeline(
ttid: TenantTimelineId, ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf, tmp_path: &Utf8PathBuf,
conf: &SafeKeeperConf, conf: Arc<SafeKeeperConf>,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf, &ttid.tenant_id); let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf, &ttid); let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
// We must have already checked that timeline doesn't exist in the map, // We must have already checked that timeline doesn't exist in the map,
// but there might be existing datadir: if timeline is corrupted it is // but there might be existing datadir: if timeline is corrupted it is
@@ -382,9 +380,9 @@ impl GlobalTimelines {
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk, /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid, /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
/// i.e. loaded in memory and not cancelled. /// i.e. loaded in memory and not cancelled.
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> { pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let tli_res = { let tli_res = {
let state = TIMELINES_STATE.lock().unwrap(); let state = self.state.lock().unwrap();
state.get(&ttid) state.get(&ttid)
}; };
match tli_res { match tli_res {
@@ -399,8 +397,8 @@ impl GlobalTimelines {
} }
/// Returns all timelines. This is used for background timeline processes. /// Returns all timelines. This is used for background timeline processes.
pub fn get_all() -> Vec<Arc<Timeline>> { pub fn get_all(&self) -> Vec<Arc<Timeline>> {
let global_lock = TIMELINES_STATE.lock().unwrap(); let global_lock = self.state.lock().unwrap();
global_lock global_lock
.timelines .timelines
.values() .values()
@@ -419,8 +417,8 @@ impl GlobalTimelines {
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant, /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
/// and that's why it can return cancelled timelines, to retry deleting them. /// and that's why it can return cancelled timelines, to retry deleting them.
fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> { fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
let global_lock = TIMELINES_STATE.lock().unwrap(); let global_lock = self.state.lock().unwrap();
global_lock global_lock
.timelines .timelines
.values() .values()
@@ -435,11 +433,12 @@ impl GlobalTimelines {
/// Cancels timeline, then deletes the corresponding data directory. /// Cancels timeline, then deletes the corresponding data directory.
/// If only_local, doesn't remove WAL segments in remote storage. /// If only_local, doesn't remove WAL segments in remote storage.
pub(crate) async fn delete( pub(crate) async fn delete(
&self,
ttid: &TenantTimelineId, ttid: &TenantTimelineId,
only_local: bool, only_local: bool,
) -> Result<TimelineDeleteForceResult> { ) -> Result<TimelineDeleteForceResult> {
let tli_res = { let tli_res = {
let state = TIMELINES_STATE.lock().unwrap(); let state = self.state.lock().unwrap();
if state.tombstones.contains_key(ttid) { if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do. // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
@@ -472,7 +471,7 @@ impl GlobalTimelines {
} }
Err(_) => { Err(_) => {
// Timeline is not memory, but it may still exist on disk in broken state. // Timeline is not memory, but it may still exist on disk in broken state.
let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid); let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
let dir_existed = delete_dir(dir_path)?; let dir_existed = delete_dir(dir_path)?;
Ok(TimelineDeleteForceResult { Ok(TimelineDeleteForceResult {
@@ -485,7 +484,7 @@ impl GlobalTimelines {
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones // Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data, // are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects. // and to speed up repeated deletion calls by avoiding re-listing objects.
TIMELINES_STATE.lock().unwrap().delete(*ttid); self.state.lock().unwrap().delete(*ttid);
result result
} }
@@ -497,17 +496,18 @@ impl GlobalTimelines {
/// ///
/// If only_local, doesn't remove WAL segments in remote storage. /// If only_local, doesn't remove WAL segments in remote storage.
pub async fn delete_force_all_for_tenant( pub async fn delete_force_all_for_tenant(
&self,
tenant_id: &TenantId, tenant_id: &TenantId,
only_local: bool, only_local: bool,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> { ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
info!("deleting all timelines for tenant {}", tenant_id); info!("deleting all timelines for tenant {}", tenant_id);
let to_delete = Self::get_all_for_tenant(*tenant_id); let to_delete = self.get_all_for_tenant(*tenant_id);
let mut err = None; let mut err = None;
let mut deleted = HashMap::new(); let mut deleted = HashMap::new();
for tli in &to_delete { for tli in &to_delete {
match Self::delete(&tli.ttid, only_local).await { match self.delete(&tli.ttid, only_local).await {
Ok(result) => { Ok(result) => {
deleted.insert(tli.ttid, result); deleted.insert(tli.ttid, result);
} }
@@ -529,15 +529,15 @@ impl GlobalTimelines {
// so the directory may be not empty. In this case timelines will have bad state // so the directory may be not empty. In this case timelines will have bad state
// and timeline background jobs can panic. // and timeline background jobs can panic.
delete_dir(get_tenant_dir( delete_dir(get_tenant_dir(
TIMELINES_STATE.lock().unwrap().get_conf(), self.state.lock().unwrap().conf.as_ref(),
tenant_id, tenant_id,
))?; ))?;
Ok(deleted) Ok(deleted)
} }
pub fn housekeeping(tombstone_ttl: &Duration) { pub fn housekeeping(&self, tombstone_ttl: &Duration) {
let mut state = TIMELINES_STATE.lock().unwrap(); let mut state = self.state.lock().unwrap();
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they // timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they

View File

@@ -4,6 +4,7 @@
//! //!
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use postgres_backend::QueryError; use postgres_backend::QueryError;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader; use tokio_io_timeout::TimeoutReader;
@@ -11,9 +12,9 @@ use tokio_util::sync::CancellationToken;
use tracing::*; use tracing::*;
use utils::{auth::Scope, measured_stream::MeasuredStream}; use utils::{auth::Scope, measured_stream::MeasuredStream};
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::TrafficMetrics; use crate::metrics::TrafficMetrics;
use crate::SafeKeeperConf; use crate::SafeKeeperConf;
use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines};
use postgres_backend::{AuthType, PostgresBackend}; use postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread. /// Accept incoming TCP connections and spawn them into a background thread.
@@ -22,9 +23,10 @@ use postgres_backend::{AuthType, PostgresBackend};
/// to any tenant are allowed) or Tenant (only tokens giving access to specific /// to any tenant are allowed) or Tenant (only tokens giving access to specific
/// tenant are allowed). Doesn't matter if auth is disabled in conf. /// tenant are allowed). Doesn't matter if auth is disabled in conf.
pub async fn task_main( pub async fn task_main(
conf: SafeKeeperConf, conf: Arc<SafeKeeperConf>,
pg_listener: std::net::TcpListener, pg_listener: std::net::TcpListener,
allowed_auth_scope: Scope, allowed_auth_scope: Scope,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Tokio's from_std won't do this for us, per its comment. // Tokio's from_std won't do this for us, per its comment.
pg_listener.set_nonblocking(true)?; pg_listener.set_nonblocking(true)?;
@@ -37,10 +39,10 @@ pub async fn task_main(
debug!("accepted connection from {}", peer_addr); debug!("accepted connection from {}", peer_addr);
let conf = conf.clone(); let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count); let conn_id = issue_connection_id(&mut connection_count);
let global_timelines = global_timelines.clone();
tokio::spawn( tokio::spawn(
async move { async move {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await { if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
error!("connection handler exited: {}", err); error!("connection handler exited: {}", err);
} }
} }
@@ -53,9 +55,10 @@ pub async fn task_main(
/// ///
async fn handle_socket( async fn handle_socket(
socket: TcpStream, socket: TcpStream,
conf: SafeKeeperConf, conf: Arc<SafeKeeperConf>,
conn_id: ConnectionId, conn_id: ConnectionId,
allowed_auth_scope: Scope, allowed_auth_scope: Scope,
global_timelines: Arc<GlobalTimelines>,
) -> Result<(), QueryError> { ) -> Result<(), QueryError> {
socket.set_nodelay(true)?; socket.set_nodelay(true)?;
let peer_addr = socket.peer_addr()?; let peer_addr = socket.peer_addr()?;
@@ -96,8 +99,13 @@ async fn handle_socket(
Some(_) => AuthType::NeonJWT, Some(_) => AuthType::NeonJWT,
}; };
let auth_pair = auth_key.map(|key| (allowed_auth_scope, key)); let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
let mut conn_handler = let mut conn_handler = SafekeeperPostgresHandler::new(
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair); conf,
conn_id,
Some(traffic_metrics.clone()),
auth_pair,
global_timelines,
);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver // libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown. // We don't use shutdown.

View File

@@ -636,6 +636,13 @@ impl Persistence {
.into_boxed(), .into_boxed(),
}; };
// Clear generation_pageserver if we are moving into a state where we won't have
// any attached pageservers.
let input_generation_pageserver = match input_placement_policy {
None | Some(PlacementPolicy::Attached(_)) => None,
Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None),
};
#[derive(AsChangeset)] #[derive(AsChangeset)]
#[diesel(table_name = crate::schema::tenant_shards)] #[diesel(table_name = crate::schema::tenant_shards)]
struct ShardUpdate { struct ShardUpdate {
@@ -643,6 +650,7 @@ impl Persistence {
placement_policy: Option<String>, placement_policy: Option<String>,
config: Option<String>, config: Option<String>,
scheduling_policy: Option<String>, scheduling_policy: Option<String>,
generation_pageserver: Option<Option<i64>>,
} }
let update = ShardUpdate { let update = ShardUpdate {
@@ -655,6 +663,7 @@ impl Persistence {
.map(|c| serde_json::to_string(&c).unwrap()), .map(|c| serde_json::to_string(&c).unwrap()),
scheduling_policy: input_scheduling_policy scheduling_policy: input_scheduling_policy
.map(|p| serde_json::to_string(&p).unwrap()), .map(|p| serde_json::to_string(&p).unwrap()),
generation_pageserver: input_generation_pageserver,
}; };
query.set(update).execute(conn)?; query.set(update).execute(conn)?;

View File

@@ -513,6 +513,9 @@ struct ShardUpdate {
/// If this is None, generation is not updated. /// If this is None, generation is not updated.
generation: Option<Generation>, generation: Option<Generation>,
/// If this is None, scheduling policy is not updated.
scheduling_policy: Option<ShardSchedulingPolicy>,
} }
enum StopReconciliationsReason { enum StopReconciliationsReason {
@@ -2376,6 +2379,23 @@ impl Service {
} }
}; };
// Ordinarily we do not update scheduling policy, but when making major changes
// like detaching or demoting to secondary-only, we need to force the scheduling
// mode to Active, or the caller's expected outcome (detach it) will not happen.
let scheduling_policy = match req.config.mode {
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
// Special case: when making major changes like detaching or demoting to secondary-only,
// we need to force the scheduling mode to Active, or nothing will happen.
Some(ShardSchedulingPolicy::Active)
}
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// While attached, continue to respect whatever the existing scheduling mode is.
None
}
};
let mut create = true; let mut create = true;
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
// Saw an existing shard: this is not a creation // Saw an existing shard: this is not a creation
@@ -2401,6 +2421,7 @@ impl Service {
placement_policy: placement_policy.clone(), placement_policy: placement_policy.clone(),
tenant_config: req.config.tenant_conf.clone(), tenant_config: req.config.tenant_conf.clone(),
generation: set_generation, generation: set_generation,
scheduling_policy,
}); });
} }
@@ -2497,6 +2518,7 @@ impl Service {
placement_policy, placement_policy,
tenant_config, tenant_config,
generation, generation,
scheduling_policy,
} in &updates } in &updates
{ {
self.persistence self.persistence
@@ -2505,7 +2527,7 @@ impl Service {
Some(placement_policy.clone()), Some(placement_policy.clone()),
Some(tenant_config.clone()), Some(tenant_config.clone()),
*generation, *generation,
None, *scheduling_policy,
) )
.await?; .await?;
} }
@@ -2521,6 +2543,7 @@ impl Service {
placement_policy, placement_policy,
tenant_config, tenant_config,
generation: update_generation, generation: update_generation,
scheduling_policy,
} in updates } in updates
{ {
let Some(shard) = tenants.get_mut(&tenant_shard_id) else { let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
@@ -2539,6 +2562,10 @@ impl Service {
shard.generation = Some(generation); shard.generation = Some(generation);
} }
if let Some(scheduling_policy) = scheduling_policy {
shard.set_scheduling_policy(scheduling_policy);
}
shard.schedule(scheduler, &mut schedule_context)?; shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes); let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
@@ -2992,9 +3019,17 @@ impl Service {
let TenantPolicyRequest { let TenantPolicyRequest {
placement, placement,
scheduling, mut scheduling,
} = req; } = req;
if let Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) = placement {
// When someone configures a tenant to detach, we force the scheduling policy to enable
// this to take effect.
if scheduling.is_none() {
scheduling = Some(ShardSchedulingPolicy::Active);
}
}
self.persistence self.persistence
.update_tenant_shard( .update_tenant_shard(
TenantFilter::Tenant(tenant_id), TenantFilter::Tenant(tenant_id),

View File

@@ -0,0 +1,21 @@
# How to run the `pg_regress` tests on a cloud Neon instance.
* Create a Neon project on staging.
* Grant the superuser privileges to the DB user.
* (Optional) create a branch for testing
* Configure the endpoint by updating the control-plane database with the following settings:
* `Timeone`: `America/Los_Angeles`
* `DateStyle`: `Postgres,MDY`
* `compute_query_id`: `off`
* Checkout the actual `Neon` sources
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
```bash
$ cd vendor/postgres-v17
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
```
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
* Set the environment variable `PG_VERSION` to the version of your project.
* Run
```bash
$ pytest -m remote_cluster -k cloud_regress
```

View File

@@ -5,68 +5,15 @@ Run the regression tests on the cloud instance of Neon
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import Any
import psycopg2
import pytest import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import RemotePostgres from fixtures.neon_fixtures import RemotePostgres
from fixtures.pg_version import PgVersion from fixtures.pg_version import PgVersion
@pytest.fixture
def setup(remote_pg: RemotePostgres):
"""
Setup and teardown of the tests
"""
with psycopg2.connect(remote_pg.connstr()) as conn:
with conn.cursor() as cur:
log.info("Creating the extension")
cur.execute("CREATE EXTENSION IF NOT EXISTS regress_so")
conn.commit()
# TODO: Migrate to branches and remove this code
log.info("Looking for subscriptions in the regress database")
cur.execute(
"SELECT subname FROM pg_catalog.pg_subscription WHERE "
"subdbid = (SELECT oid FROM pg_catalog.pg_database WHERE datname='regression');"
)
if cur.rowcount > 0:
with psycopg2.connect(
dbname="regression",
host=remote_pg.default_options["host"],
user=remote_pg.default_options["user"],
password=remote_pg.default_options["password"],
) as regress_conn:
with regress_conn.cursor() as regress_cur:
for sub in cur:
regress_cur.execute(f"ALTER SUBSCRIPTION {sub[0]} DISABLE")
regress_cur.execute(
f"ALTER SUBSCRIPTION {sub[0]} SET (slot_name = NONE)"
)
regress_cur.execute(f"DROP SUBSCRIPTION {sub[0]}")
regress_conn.commit()
yield
# TODO: Migrate to branches and remove this code
log.info("Looking for extra roles...")
with psycopg2.connect(remote_pg.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT rolname FROM pg_catalog.pg_roles WHERE oid > 16384 AND rolname <> 'neondb_owner'"
)
roles: list[Any] = []
for role in cur:
log.info("Role found: %s", role[0])
roles.append(role[0])
for role in roles:
cur.execute(f"DROP ROLE {role}")
conn.commit()
@pytest.mark.timeout(7200) @pytest.mark.timeout(7200)
@pytest.mark.remote_cluster @pytest.mark.remote_cluster
def test_cloud_regress( def test_cloud_regress(
setup,
remote_pg: RemotePostgres, remote_pg: RemotePostgres,
pg_version: PgVersion, pg_version: PgVersion,
pg_distrib_dir: Path, pg_distrib_dir: Path,

View File

@@ -175,6 +175,8 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_tenant_throttling_count_accounted_finish"), counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"), counter("pageserver_tenant_throttling_wait_usecs_sum"),
counter("pageserver_tenant_throttling_count"), counter("pageserver_tenant_throttling_count"),
counter("pageserver_timeline_wal_records_received"),
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
*histogram("pageserver_page_service_batch_size"), *histogram("pageserver_page_service_batch_size"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold # "pageserver_directory_entries_count", -- only used if above a certain threshold

View File

@@ -54,23 +54,15 @@ def wait_for_upload(
tenant: TenantId | TenantShardId, tenant: TenantId | TenantShardId,
timeline: TimelineId, timeline: TimelineId,
lsn: Lsn, lsn: Lsn,
timeout=20,
): ):
"""waits for local timeline upload up to specified lsn""" """Waits for local timeline upload up to specified LSN"""
current_lsn = Lsn(0) def is_uploaded():
for i in range(20): remote_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline) assert remote_lsn >= lsn, f"remote_consistent_lsn at {remote_lsn}"
if current_lsn >= lsn:
log.info("wait finished") wait_until(is_uploaded, name=f"upload to {lsn}", timeout=timeout)
return
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
log.info(
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
)
time.sleep(1)
raise Exception(
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
)
def _tenant_in_expected_state(tenant_info: dict[str, Any], expected_state: str): def _tenant_in_expected_state(tenant_info: dict[str, Any], expected_state: str):

View File

@@ -0,0 +1,142 @@
from __future__ import annotations
import random
from concurrent.futures import ThreadPoolExecutor
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import s3_storage
@pytest.mark.timeout(900)
@pytest.mark.parametrize("size", [8, 1024, 8192])
@pytest.mark.parametrize("s3", [True, False], ids=["s3", "local"])
@pytest.mark.parametrize("backpressure", [True, False], ids=["backpressure", "nobackpressure"])
@pytest.mark.parametrize("fsync", [True, False], ids=["fsync", "nofsync"])
def test_ingest_insert_bulk(
request: pytest.FixtureRequest,
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
fsync: bool,
backpressure: bool,
s3: bool,
size: int,
):
"""
Benchmarks ingestion of 5 GB of sequential insert WAL. Measures ingestion and S3 upload
separately. Also does a Safekeeper→Pageserver re-ingestion to measure Pageserver ingestion in
isolation.
"""
CONCURRENCY = 1 # 1 is optimal without fsync or backpressure
VOLUME = 5 * 1024**3
rows = VOLUME // (size + 64) # +64 roughly accounts for per-row WAL overhead
neon_env_builder.safekeepers_enable_fsync = fsync
if s3:
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# NB: don't use S3 for Safekeeper. It doesn't affect throughput (no backpressure), but it
# would compete with Pageserver for bandwidth.
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
f"fsync = {fsync}",
"max_replication_apply_lag = 0",
f"max_replication_flush_lag = {'10GB' if backpressure else '0'}",
# NB: neon_local defaults to 15MB, which is too slow -- production uses 500MB.
f"max_replication_write_lag = {'500MB' if backpressure else '0'}",
],
)
endpoint.safe_psql("create extension neon")
# Wait for the timeline to be propagated to the pageserver.
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Ingest rows.
log.info("Ingesting data")
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
def insert_rows(endpoint, table, count, value):
with endpoint.connect().cursor() as cur:
cur.execute("set statement_timeout = 0")
cur.execute(f"create table {table} (id int, data bytea)")
cur.execute(f"insert into {table} values (generate_series(1, {count}), %s)", (value,))
with zenbenchmark.record_duration("upload"):
with zenbenchmark.record_duration("ingest"):
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
for i in range(CONCURRENCY):
# Write a random value for all rows. This is sufficient to prevent compression,
# e.g. in TOAST. Randomly generating every row is too slow.
value = random.randbytes(size)
worker_rows = rows / CONCURRENCY
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Wait for pageserver to ingest the WAL.
client = env.pageserver.http_client()
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
# Wait for pageserver S3 upload. Checkpoint to flush the last in-memory layer.
client.timeline_checkpoint(
env.initial_tenant,
env.initial_timeline,
compact=False,
wait_until_flushed=False,
)
wait_for_upload(client, env.initial_tenant, env.initial_timeline, end_lsn, timeout=600)
# Empty out upload queue for next benchmark.
wait_for_upload_queue_empty(client, env.initial_tenant, env.initial_timeline)
backpressure_time = endpoint.safe_psql("select backpressure_throttling_time()")[0][0]
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
# reingest all the WAL directly from the safekeeper. This gives us a baseline of how fast the
# pageserver can ingest this WAL in isolation.
status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant)
assert status is not None
endpoint.stop() # avoid spurious getpage errors
client.tenant_delete(env.initial_tenant)
env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0])
with zenbenchmark.record_duration("recover"):
log.info("Recovering WAL into pageserver")
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
# Emit metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
zenbenchmark.record("row_count", rows, "rows", MetricReport.TEST_PARAM)
zenbenchmark.record("concurrency", CONCURRENCY, "clients", MetricReport.TEST_PARAM)
zenbenchmark.record(
"backpressure_time", backpressure_time // 1000, "ms", MetricReport.LOWER_IS_BETTER
)
props = {p["name"]: p["value"] for _, p in request.node.user_properties}
for name in ("ingest", "upload", "recover"):
throughput = int(wal_written_mb / props[name])
zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)
# Pageserver shutdown will likely get stuck on the upload queue, just shut it down immediately.
env.stop(immediate=True)

View File

@@ -153,19 +153,20 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
if i % 10 == 0: if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...") log.info(f"Running churn round {i}/{churn_rounds} ...")
ps_http.timeline_compact( # Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
tenant_id, ps_http.timeline_compact(
timeline_id, tenant_id,
enhanced_gc_bottom_most_compaction=True, timeline_id,
body={ enhanced_gc_bottom_most_compaction=True,
"scheduled": True, body={
"compact_range": { "scheduled": True,
"start": "000000000000000000000000000000000000", "sub_compaction": True,
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this "compact_range": {
"end": "010000000000000000000000000000000000", "start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
}, },
}, )
)
workload.churn_rows(row_count, env.pageserver.id) workload.churn_rows(row_count, env.pageserver.id)
@@ -177,6 +178,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Validating at workload end ...") log.info("Validating at workload end ...")
workload.validate(env.pageserver.id) workload.validate(env.pageserver.id)
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_gc(tenant_id, timeline_id, None)
# Stripe sizes in number of pages. # Stripe sizes in number of pages.
TINY_STRIPES = 16 TINY_STRIPES = 16

View File

@@ -215,7 +215,7 @@ if SQL_EXPORTER is None:
# #
# The "host" network mode allows sql_exporter to talk to the # The "host" network mode allows sql_exporter to talk to the
# endpoint which is running on the host. # endpoint which is running on the host.
super().__init__("docker.io/burningalchemist/sql_exporter:0.13.1", network_mode="host") super().__init__("docker.io/burningalchemist/sql_exporter:0.16.0", network_mode="host")
self.__logs_dir = logs_dir self.__logs_dir = logs_dir
self.__port = port self.__port = port

View File

@@ -3230,3 +3230,55 @@ def test_multi_attached_timeline_creation(neon_env_builder: NeonEnvBuilder, migr
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown # Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
raise raise
@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_detached_stopped(
neon_env_builder: NeonEnvBuilder,
):
"""
Test that detaching a tenant while it has scheduling policy set to Paused or Stop works
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_configs()
env.start()
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=1,
)
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
# Disable scheduling: ordinarily this would prevent the tenant's configuration being
# reconciled to pageservers, but this should be overridden when detaching.
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*")
env.storage_controller.tenant_policy_update(
tenant_id,
{"scheduling": "Stop"},
)
env.storage_controller.consistency_check()
# Detach the tenant
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
"generation": None,
},
)
env.storage_controller.consistency_check()
# Confirm the detach happened
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []

View File

@@ -33,6 +33,7 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt",
digest = { version = "0.10", features = ["mac", "oid", "std"] } digest = { version = "0.10", features = ["mac", "oid", "std"] }
either = { version = "1" } either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] }
form_urlencoded = { version = "1" }
futures-channel = { version = "0.3", features = ["sink"] } futures-channel = { version = "0.3", features = ["sink"] }
futures-executor = { version = "0.3" } futures-executor = { version = "0.3" }
futures-io = { version = "0.3" } futures-io = { version = "0.3" }
@@ -78,6 +79,7 @@ sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] } signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] } smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
spki = { version = "0.7", default-features = false, features = ["pem", "std"] } spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
stable_deref_trait = { version = "1" }
subtle = { version = "2" } subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] } sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] } tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
@@ -105,6 +107,7 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] } bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] } cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
displaydoc = { version = "0.2" }
either = { version = "1" } either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] } half = { version = "2", default-features = false, features = ["num-traits"] }