Compare commits

..

14 Commits

Author SHA1 Message Date
Christian Schwarz
980d506bda pageserver: shutdown all walredo managers 8s into shutdown (#8572)
# Motivation

The working theory for hung systemd during PS deploy
(https://github.com/neondatabase/cloud/issues/11387) is that leftover
walredo processes trigger a race condition.

In https://github.com/neondatabase/neon/pull/8150 I arranged that a
clean Tenant shutdown does actually kill its walredo processes.

But many prod machines don't manage to shut down all their tenants until
the 10s systemd timeout hits and, presumably, triggers the race
condition in systemd / the Linux kernel that causes the frozen systemd

# Solution

This PR bolts on a rather ugly mechanism to shut down tenant managers
out of order 8s after we've received the SIGTERM from systemd.

# Changes

- add a global registry of `Weak<WalRedoManager>`
- add a special thread spawned during `shutdown_pageserver` that sleeps
for 8s, then shuts down all redo managers in the registry and prevents
new redo managers from being created
- propagate the new failure mode of tenant spawning throughout the code
base
- make sure shut down tenant manager results in
PageReconstructError::Cancelled so that if Timeline::get calls come in
after the shutdown, they do the right thing
2024-08-01 07:57:09 +02:00
Alex Chi Z.
d6c79b77df test(pageserver): add test_gc_feedback_with_snapshots (#8474)
should be working after https://github.com/neondatabase/neon/pull/8328
gets merged. Part of https://github.com/neondatabase/neon/issues/8002

adds a new perf benchmark case that ensures garbages can be collected
with branches

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-07-31 17:55:19 -04:00
Alexander Bayandin
3350daeb9a CI(create-test-report): fix missing benchmark results in Allure report (#8540)
## Problem

In https://github.com/neondatabase/neon/pull/8241 I've accidentally
removed `create-test-report` dependency on `benchmarks` job

## Summary of changes
- Run `create-test-report` after `benchmarks` job
2024-07-31 19:47:59 +01:00
Arpad Müller
939d50a41c storage_scrubber: migrate FindGarbage to remote_storage (#8548)
Uses the newly added APIs from #8541 named `stream_tenants_generic` and
`stream_objects_with_retries` and extends them with
`list_objects_with_retries_generic` and
`stream_tenant_timelines_generic` to migrate the `find-garbage` command
of the scrubber to `GenericRemoteStorage`.

Part of https://github.com/neondatabase/neon/issues/7547
2024-07-31 18:24:42 +00:00
John Spray
2f9ada13c4 controller: simplify reconciler generation increment logic (#8560)
## Problem

This code was confusing, untested and covered:
- an impossible case, where intent state is AttacheStale (we never do
this)
- a rare edge case (going from AttachedMulti to Attached), which we were
not testing, and in any case the pageserver internally does the same
Tenant reset in this transition as it would do if we incremented
generation.

Closes: https://github.com/neondatabase/neon/issues/8367

## Summary of changes

- Simplify the logic to only skip incrementing the generation if the
location already has the expected generation and the exact same mode.
2024-07-31 18:37:47 +01:00
Cihan Demirci
ff51b565d3 cicd: change Azure storage details [2/2] (#8562)
Change Azure storage configuration to point to updated variables/secrets.

Also update subscription id variable.
2024-07-31 17:42:10 +01:00
Tristan Partin
5e0409de95 Fix negative replication delay metric
In some cases, we can get a negative metric for replication_delay_bytes.
My best guess from all the research I've done is that we evaluate
pg_last_wal_receive_lsn() before pg_last_wal_replay_lsn(), and that by
the time everything is said and done, the replay LSN has advanced past
the receive LSN. In this case, our lag can effectively be modeled as
0 due to the speed of the WAL reception and replay.
2024-07-31 10:16:58 -05:00
Christian Schwarz
4e3b70e308 refactor(page_service): Timeline gate guard holding + cancellation + shutdown (#8339)
Since the introduction of sharding, the protocol handling loop in
`handle_pagerequests` cannot know anymore which concrete
`Tenant`/`Timeline` object any of the incoming `PagestreamFeMessage`
resolves to.
In fact, one message might resolve to one `Tenant`/`Timeline` while
the next one may resolve to another one.

To avoid going to tenant manager, we added the `shard_timelines` which
acted as an ever-growing cache that held timeline gate guards open for
the lifetime of the connection.
The consequence of holding the gate guards open was that we had to be
sensitive to every cached `Timeline::cancel` on each interaction with
the network connection, so that Timeline shutdown would not have to wait
for network connection interaction.

We can do better than that, meaning more efficiency & better
abstraction.
I proposed a sketch for it in

* https://github.com/neondatabase/neon/pull/8286

and this PR implements an evolution of that sketch.

The main idea is is that `mod page_service` shall be solely concerned
with the following:
1. receiving requests by speaking the protocol / pagestream subprotocol
2. dispatching the request to a corresponding method on the correct
shard/`Timeline` object
3. sending response by speaking the protocol / pagestream subprotocol.

The cancellation sensitivity responsibilities are clear cut:
* while in `page_service` code, sensitivity to page_service cancellation
is sufficient
* while in `Timeline` code, sensitivity to `Timeline::cancel` is
sufficient

To enforce these responsibilities, we introduce the notion of a
`timeline::handle::Handle` to a `Timeline` object that is checked out
from a `timeline::handle::Cache` for **each request**.
The `Handle` derefs to `Timeline` and is supposed to be used for a
single async method invocation on `Timeline`.
See the lengthy doc comment in `mod handle` for details of the design.
2024-07-31 17:05:45 +02:00
Alex Chi Z.
61a65f61f3 feat(pageserver): support btm-gc-compaction for child branches (#8519)
part of https://github.com/neondatabase/neon/issues/8002

For child branches, we will pull the image of the modified keys from the
parant into the child branch, which creates a full history for
generating key retention. If there are not enough delta keys, the image
won't be wrote eventually, and we will only keep the deltas inside the
child branch. We could avoid the wasteful work to pull the image from
the parent if we can know the number of deltas in advance, in the future
(currently we always pull image for all modified keys in the child
branch)


---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-07-31 15:48:48 +01:00
Alexander Bayandin
d21246c8bd CI(regress-tests): run less regression tests (#8561)
## Problem
We run regression tests on `release` & `debug` builds for each of the
three supported Postgres versions (6 in total).
With upcoming ARM support and Postgres 17, the number of jobs will jump
to 16, which is a lot.

See the internal discussion here:
https://neondb.slack.com/archives/C033A2WE6BZ/p1722365908404329

## Summary of changes
- Run `regress-tests` job in debug builds only with the latest Postgres
version
- Do not do `debug` builds on release branches
2024-07-31 15:10:27 +01:00
Christian Schwarz
4825b0fec3 compaction_level0_phase1: bypass PS PageCache for data blocks (#8543)
part of https://github.com/neondatabase/neon/issues/8184

# Problem

We want to bypass PS PageCache for all data block reads, but
`compact_level0_phase1` currently uses `ValueRef::load` to load the WAL
records from delta layers.
Internally, that maps to `FileBlockReader:read_blk` which hits the
PageCache
[here](e78341e1c2/pageserver/src/tenant/block_io.rs (L229-L236)).

# Solution

This PR adds a mode for `compact_level0_phase1` that uses the
`MergeIterator` for reading the `Value`s from the delta layer files.

`MergeIterator` is a streaming k-merge that uses vectored blob_io under
the hood, which bypasses the PS PageCache for data blocks.

Other notable changes:
* change the `DiskBtreeReader::into_stream` to buffer the node, instead
of holding a `PageCache` `PageReadGuard`.
* Without this, we run out of page cache slots in
`test_pageserver_compaction_smoke`.
* Generally, `PageReadGuard`s aren't supposed to be held across await
points, so, this is a general bugfix.

# Testing / Validation / Performance

`MergeIterator` has not yet been used in production; it's being
developed as part of
* https://github.com/neondatabase/neon/issues/8002

Therefore, this PR adds a validation mode that compares the existing
approach's value iterator with the new approach's stream output, item by
item.
If they're not identical, we log a warning / fail the unit/regression
test.
To avoid flooding the logs, we apply a global rate limit of once per 10
seconds.
In any case, we use the existing approach's value.

Expected performance impact that will be monitored in staging / nightly
benchmarks / eventually pre-prod:
* with validation:
  * increased CPU usage
  * ~doubled VirtualFile read bytes/second metric
* no change in disk IO usage because the kernel page cache will likely
have the pages buffered on the second read
* without validation:
* slightly higher DRAM usage because each iterator participating in the
k-merge has a dedicated buffer (as opposed to before, where compactions
would rely on the PS PageCaceh as a shared evicting buffer)
* less disk IO if previously there were repeat PageCache misses (likely
case on a busy production Pageserver)
* lower CPU usage: PageCache out of the picture, fewer syscalls are made
(vectored blob io batches reads)

# Rollout

The new code is used with validation mode enabled-by-default.
This gets us validation everywhere by default, specifically in
- Rust unit tests
- Python tests
- Nightly pagebench (shouldn't really matter)
- Staging

Before the next release, I'll merge the following aws.git PR that
configures prod to continue using the existing behavior:

* https://github.com/neondatabase/aws/pull/1663

# Interactions With Other Features

This work & rollout should complete before Direct IO is enabled because
Direct IO would double the IOPS & latency for each compaction read
(#8240).

# Future Work

The streaming k-merge's memory usage is proportional to the amount of
memory per participating layer.

But `compact_level0_phase1` still loads all keys into memory for
`all_keys_iter`.
Thus, it continues to have active memory usage proportional to the
number of keys involved in the compaction.

Future work should replace `all_keys_iter` with a streaming keys
iterator.
This PR has a draft in its first commit, which I later reverted because
it's not necessary to achieve the goal of this PR / issue #8184.
2024-07-31 14:17:59 +02:00
Cihan Demirci
a4df3c8488 cicd: change Azure storage details [1/2] (#8553)
Change Azure storage configuration to point to new variables/secrets. They have
the `_NEW` suffix in order not to disrupt any tests while we complete the
switch.
2024-07-30 19:34:15 +00:00
Christian Schwarz
d95b46f3f3 cleanup(compact_level0_phase1): some commentary and wrapping into block expressions (#8544)
Byproduct of scouting done for
https://github.com/neondatabase/neon/issues/8184

refs https://github.com/neondatabase/neon/issues/8184
2024-07-30 18:13:18 +02:00
Yuchen Liang
85bef9f05d feat(scrubber): post scan_metadata results to storage controller (#8502)
Part of #8128, followup to #8480. closes #8421. 

Enable scrubber to optionally post metadata scan health results to
storage controller.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-07-30 16:07:34 +01:00
30 changed files with 2556 additions and 691 deletions

View File

@@ -19,6 +19,10 @@ on:
description: 'debug or release'
required: true
type: string
pg-versions:
description: 'a json array of postgres versions to run regression tests on'
required: true
type: string
defaults:
run:
@@ -254,7 +258,7 @@ jobs:
strategy:
fail-fast: false
matrix:
pg_version: [ v14, v15, v16 ]
pg_version: ${{ fromJson(inputs.pg-versions) }}
steps:
- uses: actions/checkout@v4
with:
@@ -284,5 +288,5 @@ jobs:
- name: Merge and upload coverage data
if: |
false &&
inputs.build-type == 'debug' && matrix.pg_version == 'v14'
inputs.build-type == 'debug' && matrix.pg_version == 'v16'
uses: ./.github/actions/save-coverage-data

View File

@@ -203,7 +203,8 @@ jobs:
fail-fast: false
matrix:
arch: [ x64 ]
build-type: [ debug, release ]
# Do not build or run tests in debug for release branches
build-type: ${{ fromJson((startsWith(github.ref_name, 'release' && github.event_name == 'push')) && '["release"]' || '["debug", "release"]') }}
include:
- build-type: release
arch: arm64
@@ -213,6 +214,8 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
# Run tests on all Postgres versions in release builds and only on the latest version in debug builds
pg-versions: ${{ matrix.build-type == 'release' && '["v14", "v15", "v16"]' || '["v16"]' }}
secrets: inherit
# Keep `benchmarks` job outside of `build-and-test-locally` workflow to make job failures non-blocking
@@ -306,7 +309,7 @@ jobs:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
create-test-report:
needs: [ check-permissions, build-and-test-locally, coverage-report, build-build-tools-image ]
needs: [ check-permissions, build-and-test-locally, coverage-report, build-build-tools-image, benchmarks ]
if: ${{ !cancelled() && contains(fromJSON('["skipped", "success"]'), needs.check-permissions.result) }}
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
@@ -868,7 +871,7 @@ jobs:
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
if: github.ref_name == 'main'

View File

@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::str::FromStr;
use std::time::{Duration, Instant};
@@ -304,8 +305,8 @@ pub struct MetadataHealthRecord {
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthUpdateRequest {
pub healthy_tenant_shards: Vec<TenantShardId>,
pub unhealthy_tenant_shards: Vec<TenantShardId>,
pub healthy_tenant_shards: HashSet<TenantShardId>,
pub unhealthy_tenant_shards: HashSet<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -144,6 +144,7 @@ impl RemotePath {
///
/// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
/// NoDelimiter mode will only populate `keys`.
#[derive(Copy, Clone)]
pub enum ListingMode {
WithDelimiter,
NoDelimiter,

View File

@@ -17,11 +17,9 @@ use pageserver::config::PageserverIdentity;
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME};
use pageserver::tenant::{secondary, TenantSharedResources};
use pageserver::{
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener,
};
use pageserver::{CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
@@ -31,11 +29,9 @@ use tracing::*;
use metrics::set_build_info_metric;
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::{BACKGROUND_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
@@ -129,6 +125,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
@@ -593,30 +590,13 @@ fn start_pageserver(
// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
let libpq_listener = {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
TaskKind::LibpqEndpointListener,
// listener task shouldn't need to download anything. (We will
// create a separate sub-contexts for each connection, with their
// own download behavior. This context is used only to listen and
// accept connections.)
DownloadBehavior::Error,
);
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
page_service::libpq_listener_main(
tenant_manager.clone(),
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
cancel.clone(),
),
));
LibpqEndpointListener(CancellableTask { task, cancel })
};
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
@@ -644,7 +624,7 @@ fn start_pageserver(
shutdown_pageserver.take();
pageserver::shutdown_pageserver(
http_endpoint_listener,
libpq_listener,
page_service,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,

View File

@@ -29,6 +29,7 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -295,6 +296,10 @@ pub struct PageServerConf {
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: L0FlushConfig,
/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -401,6 +406,8 @@ struct PageServerConfigBuilder {
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
l0_flush: BuilderValue<L0FlushConfig>,
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
}
impl PageServerConfigBuilder {
@@ -490,6 +497,7 @@ impl PageServerConfigBuilder {
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
}
}
}
@@ -673,6 +681,10 @@ impl PageServerConfigBuilder {
self.l0_flush = BuilderValue::Set(value);
}
pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) {
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
}
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -730,6 +742,7 @@ impl PageServerConfigBuilder {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
compact_level0_phase1_value_access,
}
CUSTOM LOGIC
{
@@ -1002,6 +1015,9 @@ impl PageServerConf {
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
"compact_level0_phase1_value_access" => {
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1086,6 +1102,7 @@ impl PageServerConf {
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
}
}
}
@@ -1327,6 +1344,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1401,6 +1419,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -296,6 +296,11 @@ impl From<GetActiveTenantError> for ApiError {
GetActiveTenantError::WaitForActiveTimeout { .. } => {
ApiError::ResourceUnavailable(format!("{}", e).into())
}
GetActiveTenantError::SwitchedTenant => {
// in our HTTP handlers, this error doesn't happen
// TODO: separate error types
ApiError::ResourceUnavailable("switched tenant".into())
}
}
}
}

View File

@@ -12,6 +12,8 @@ pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub mod l0_flush;
use futures::{stream::FuturesUnordered, StreamExt};
pub use pageserver_api::keyspace;
use tokio_util::sync::CancellationToken;
pub mod aux_file;
@@ -30,14 +32,13 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use tenant::{
mgr::{BackgroundPurges, TenantManager},
secondary,
};
use tracing::info;
use tracing::{info, info_span};
/// Current storage format version
///
@@ -63,7 +64,6 @@ pub struct CancellableTask {
pub cancel: CancellationToken,
}
pub struct HttpEndpointListener(pub CancellableTask);
pub struct LibpqEndpointListener(pub CancellableTask);
pub struct ConsumptionMetricsTasks(pub CancellableTask);
pub struct DiskUsageEvictionTask(pub CancellableTask);
impl CancellableTask {
@@ -77,7 +77,7 @@ impl CancellableTask {
#[allow(clippy::too_many_arguments)]
pub async fn shutdown_pageserver(
http_listener: HttpEndpointListener,
libpq_listener: LibpqEndpointListener,
page_service: page_service::Listener,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
@@ -87,10 +87,83 @@ pub async fn shutdown_pageserver(
exit_code: i32,
) {
use std::time::Duration;
// If the orderly shutdown below takes too long, we still want to make
// sure that all walredo processes are killed and wait()ed on by us, not systemd.
//
// (Leftover walredo processes are the hypothesized trigger for the systemd freezes
// that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
//
// We use a thread instead of a tokio task because the background runtime is likely busy
// with the final flushing / uploads. This activity here has priority, and due to lack
// of scheduling priority feature sin the tokio scheduler, using a separate thread is
// an effective priority booster.
let walredo_extraordinary_shutdown_thread_span = {
let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
span.follows_from(tracing::Span::current());
span
};
let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
let walredo_extraordinary_shutdown_thread = std::thread::spawn({
let walredo_extraordinary_shutdown_thread_cancel =
walredo_extraordinary_shutdown_thread_cancel.clone();
move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _entered = rt.enter();
let _entered = walredo_extraordinary_shutdown_thread_span.enter();
if let Ok(()) = rt.block_on(tokio::time::timeout(
Duration::from_secs(8),
walredo_extraordinary_shutdown_thread_cancel.cancelled(),
)) {
info!("cancellation requested");
return;
}
let managers = tenant::WALREDO_MANAGERS
.lock()
.unwrap()
// prevents new walredo managers from being inserted
.take()
.expect("only we take()");
// Use FuturesUnordered to get in queue early for each manager's
// heavier_once_cell semaphore wait list.
// Also, for idle tenants that for some reason haven't
// shut down yet, it's quite likely that we're not going
// to get Poll::Pending once.
let mut futs: FuturesUnordered<_> = managers
.into_iter()
.filter_map(|(_, mgr)| mgr.upgrade())
.map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
.collect();
info!(count=%futs.len(), "built FuturesUnordered");
let mut last_log_at = std::time::Instant::now();
#[derive(Debug, Default)]
struct Results {
initiated: u64,
already: u64,
}
let mut results = Results::default();
while let Some(we_initiated) = rt.block_on(futs.next()) {
if we_initiated {
results.initiated += 1;
} else {
results.already += 1;
}
if last_log_at.elapsed() > Duration::from_millis(100) {
info!(remaining=%futs.len(), ?results, "progress");
last_log_at = std::time::Instant::now();
}
}
info!(?results, "done");
}
});
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
timed(
libpq_listener.0.shutdown(),
let remaining_connections = timed(
page_service.stop_accepting(),
"shutdown LibpqEndpointListener",
Duration::from_secs(1),
)
@@ -108,7 +181,7 @@ pub async fn shutdown_pageserver(
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
// should already have been canclled via mgr::shutdown_all_tenants
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
remaining_connections.shutdown(),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
@@ -162,6 +235,12 @@ pub async fn shutdown_pageserver(
Duration::from_secs(1),
)
.await;
info!("cancel & join walredo_extraordinary_shutdown_thread");
walredo_extraordinary_shutdown_thread_cancel.cancel();
walredo_extraordinary_shutdown_thread.join().unwrap();
info!("walredo_extraordinary_shutdown_thread done");
info!("Shut down successfully completed");
std::process::exit(exit_code);
}

File diff suppressed because it is too large Load Diff

View File

@@ -8,8 +8,7 @@ use std::time::Duration;
pub use pageserver_api::key::{Key, KEY_SIZE};
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),

View File

@@ -33,6 +33,7 @@ use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
@@ -312,14 +313,66 @@ impl std::fmt::Debug for Tenant {
}
pub(crate) enum WalRedoManager {
Prod(PostgresRedoManager),
Prod(WalredoManagerId, PostgresRedoManager),
#[cfg(test)]
Test(harness::TestRedoManager),
}
impl From<PostgresRedoManager> for WalRedoManager {
fn from(mgr: PostgresRedoManager) -> Self {
Self::Prod(mgr)
#[derive(thiserror::Error, Debug)]
#[error("pageserver is shutting down")]
pub(crate) struct GlobalShutDown;
impl WalRedoManager {
pub(crate) fn new(mgr: PostgresRedoManager) -> Result<Arc<Self>, GlobalShutDown> {
let id = WalredoManagerId::next();
let arc = Arc::new(Self::Prod(id, mgr));
let mut guard = WALREDO_MANAGERS.lock().unwrap();
match &mut *guard {
Some(map) => {
map.insert(id, Arc::downgrade(&arc));
Ok(arc)
}
None => Err(GlobalShutDown),
}
}
}
impl Drop for WalRedoManager {
fn drop(&mut self) {
match self {
Self::Prod(id, _) => {
let mut guard = WALREDO_MANAGERS.lock().unwrap();
if let Some(map) = &mut *guard {
map.remove(id).expect("new() registers, drop() unregisters");
}
}
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
}
}
}
}
/// Global registry of all walredo managers so that [`crate::shutdown_pageserver`] can shut down
/// the walredo processes outside of the regular order.
///
/// This is necessary to work around a systemd bug where it freezes if there are
/// walredo processes left => <https://github.com/neondatabase/cloud/issues/11387>
#[allow(clippy::type_complexity)]
pub(crate) static WALREDO_MANAGERS: once_cell::sync::Lazy<
Mutex<Option<HashMap<WalredoManagerId, Weak<WalRedoManager>>>>,
> = once_cell::sync::Lazy::new(|| Mutex::new(Some(HashMap::new())));
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub(crate) struct WalredoManagerId(u64);
impl WalredoManagerId {
pub fn next() -> Self {
static NEXT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
let id = NEXT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if id == 0 {
panic!("WalredoManagerId::new() returned 0, indicating wraparound, risking it's no longer unique");
}
Self(id)
}
}
@@ -331,19 +384,20 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) async fn shutdown(&self) {
pub(crate) async fn shutdown(&self) -> bool {
match self {
Self::Prod(mgr) => mgr.shutdown().await,
Self::Prod(_, mgr) => mgr.shutdown().await,
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
true
}
}
}
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
Self::Prod(_, mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
@@ -363,7 +417,7 @@ impl WalRedoManager {
pg_version: u32,
) -> Result<bytes::Bytes, walredo::Error> {
match self {
Self::Prod(mgr) => {
Self::Prod(_, mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
.await
}
@@ -377,7 +431,7 @@ impl WalRedoManager {
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
match self {
WalRedoManager::Prod(m) => Some(m.status()),
WalRedoManager::Prod(_, m) => Some(m.status()),
#[cfg(test)]
WalRedoManager::Test(_) => None,
}
@@ -386,6 +440,8 @@ impl WalRedoManager {
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline is shutting down")]
ShuttingDown,
#[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
NotActive {
tenant_id: TenantShardId,
@@ -675,11 +731,9 @@ impl Tenant {
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
) -> Result<Arc<Tenant>, GlobalShutDown> {
let wal_redo_manager =
WalRedoManager::new(PostgresRedoManager::new(conf, tenant_shard_id))?;
let TenantSharedResources {
broker_client,
@@ -878,7 +932,7 @@ impl Tenant {
}
.instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)),
);
tenant
Ok(tenant)
}
#[instrument(skip_all)]
@@ -7347,6 +7401,7 @@ mod tests {
Lsn(0x60),
&[Lsn(0x20), Lsn(0x40), Lsn(0x50)],
3,
None,
)
.await
.unwrap();
@@ -7471,7 +7526,7 @@ mod tests {
),
];
let res = tline
.generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3)
.generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3, None)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
@@ -7517,6 +7572,114 @@ mod tests {
};
assert_eq!(res, expected_res);
// In case of branch compaction, the branch itself does not have the full history, and we need to provide
// the ancestor image in the test case.
let history = vec![
(
key,
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
),
(
key,
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(";0x30")),
),
(
key,
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append(";0x40")),
),
(
key,
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
];
let res = tline
.generate_key_retention(
key,
&history,
Lsn(0x60),
&[],
3,
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
below_horizon: vec![(
Lsn(0x60),
KeyLogAtLsn(vec![(
Lsn(0x60),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x30;0x40")), // use the ancestor image to reconstruct the page
)]),
)],
above_horizon: KeyLogAtLsn(vec![(
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
)]),
};
assert_eq!(res, expected_res);
let history = vec![
(
key,
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
),
(
key,
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append(";0x40")),
),
(
key,
Lsn(0x60),
Value::WalRecord(NeonWalRecord::wal_append(";0x60")),
),
(
key,
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
];
let res = tline
.generate_key_retention(
key,
&history,
Lsn(0x60),
&[Lsn(0x30)],
3,
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
below_horizon: vec![
(
Lsn(0x30),
KeyLogAtLsn(vec![(
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
)]),
),
(
Lsn(0x60),
KeyLogAtLsn(vec![(
Lsn(0x60),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x40;0x60")),
)]),
),
],
above_horizon: KeyLogAtLsn(vec![(
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
)]),
};
assert_eq!(res, expected_res);
Ok(())
}
@@ -7715,4 +7878,186 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let img_layer = (0..10)
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
.collect_vec();
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x28),
Value::WalRecord(NeonWalRecord::wal_append("@0x28")),
),
(
get_key(3),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
),
];
let delta2 = vec![
(
get_key(5),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(6),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
];
let delta3 = vec![
(
get_key(8),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
(
get_key(9),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
];
let parent_tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![], // delta layers
vec![(Lsn(0x18), img_layer)], // image layers
Lsn(0x18),
)
.await?;
parent_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
let branch_tline = tenant
.branch_timeline_test_with_layers(
&parent_tline,
NEW_TIMELINE_ID,
Some(Lsn(0x18)),
&ctx,
vec![
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
], // delta layers
vec![], // image layers
Lsn(0x50),
)
.await?;
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
{
// Update GC info
let mut guard = parent_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id)],
cutoffs: GcCutoffs {
time: Lsn(0x10),
space: Lsn(0x10),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
{
// Update GC info
let mut guard = branch_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id)],
cutoffs: GcCutoffs {
time: Lsn(0x50),
space: Lsn(0x50),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
let expected_result_at_gc_horizon = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10@0x48"),
Bytes::from_static(b"value 9@0x10@0x48"),
];
let expected_result_at_lsn_40 = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10"),
Bytes::from_static(b"value 9@0x10"),
];
let verify_result = || async {
for idx in 0..10 {
assert_eq!(
branch_tline
.get(get_key(idx as u32), Lsn(0x50), &ctx)
.await
.unwrap(),
&expected_result_at_gc_horizon[idx]
);
assert_eq!(
branch_tline
.get(get_key(idx as u32), Lsn(0x40), &ctx)
.await
.unwrap(),
&expected_result_at_lsn_40[idx]
);
}
};
verify_result().await;
let cancel = CancellationToken::new();
branch_tline.compact_with_gc(&cancel, &ctx).await.unwrap();
verify_result().await;
Ok(())
}
}

View File

@@ -296,13 +296,19 @@ where
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
let mut node_buf = [0_u8; PAGE_SZ];
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor
// Read the node, through the PS PageCache, into local variable `node_buf`.
// We could keep the page cache read guard alive, but, at the time of writing,
// we run quite small PS PageCache s => can't risk running out of
// PageCache space because this stream isn't consumed fast enough.
let page_read_guard = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
node_buf.copy_from_slice(page_read_guard.as_ref());
drop(page_read_guard); // drop page cache read guard early
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let node = OnDiskNode::deparse(&node_buf)?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
@@ -345,6 +351,7 @@ where
Either::Left(idx..node.num_children.into())
};
// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;

View File

@@ -55,7 +55,7 @@ use utils::id::{TenantId, TimelineId};
use super::remote_timeline_client::remote_tenant_path;
use super::secondary::SecondaryTenant;
use super::timeline::detach_ancestor::PreparedTimelineDetach;
use super::TenantSharedResources;
use super::{GlobalShutDown, TenantSharedResources};
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
@@ -116,8 +116,6 @@ pub(crate) enum ShardSelector {
/// Only return the 0th shard, if it is present. If a non-0th shard is present,
/// ignore it.
Zero,
/// Pick the first shard we find for the TenantId
First,
/// Pick the shard that holds this key
Page(Key),
/// The shard ID is known: pick the given shard
@@ -667,17 +665,20 @@ pub async fn init_tenant_mgr(
let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
let shard_identity = location_conf.shard;
let slot = match location_conf.mode {
LocationMode::Attached(attached_conf) => TenantSlot::Attached(tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
&ctx,
)),
LocationMode::Attached(attached_conf) => TenantSlot::Attached(
tenant_spawn(
conf,
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
&ctx,
)
.expect("global shutdown during init_tenant_mgr cannot happen"),
),
LocationMode::Secondary(secondary_conf) => {
info!(
tenant_id = %tenant_shard_id.tenant_id,
@@ -725,7 +726,7 @@ fn tenant_spawn(
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Arc<Tenant> {
) -> Result<Arc<Tenant>, GlobalShutDown> {
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
// path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
// to avoid impacting prod runtime performance.
@@ -1192,7 +1193,10 @@ impl TenantManager {
None,
spawn_mode,
ctx,
);
)
.map_err(|_: GlobalShutDown| {
UpsertLocationError::Unavailable(TenantMapError::ShuttingDown)
})?;
TenantSlot::Attached(tenant)
}
@@ -1313,7 +1317,7 @@ impl TenantManager {
None,
SpawnMode::Eager,
ctx,
);
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
@@ -2047,7 +2051,7 @@ impl TenantManager {
None,
SpawnMode::Eager,
ctx,
);
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
@@ -2088,7 +2092,6 @@ impl TenantManager {
};
match selector {
ShardSelector::First => return ShardResolveResult::Found(tenant.clone()),
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
return ShardResolveResult::Found(tenant.clone())
}
@@ -2170,6 +2173,9 @@ pub(crate) enum GetActiveTenantError {
/// never happen.
#[error("Tenant is broken: {0}")]
Broken(String),
#[error("reconnect to switch tenant id")]
SwitchedTenant,
}
#[derive(Debug, thiserror::Error)]

View File

@@ -3,6 +3,7 @@ pub(crate) mod compaction;
pub mod delete;
pub(crate) mod detach_ancestor;
mod eviction_task;
pub(crate) mod handle;
mod init;
pub mod layer_manager;
pub(crate) mod logical_size;
@@ -17,6 +18,7 @@ use camino::Utf8Path;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
@@ -58,7 +60,7 @@ use std::{
sync::atomic::AtomicU64,
};
use std::{
cmp::{max, min, Ordering},
cmp::{max, min},
ops::ControlFlow,
};
use std::{
@@ -74,6 +76,7 @@ use crate::{
metadata::TimelineMetadata,
storage_layer::PersistentLayerDesc,
},
walredo,
};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -177,25 +180,6 @@ impl std::fmt::Display for ImageLayerCreationMode {
}
}
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
other.coverage_size.cmp(&self.coverage_size) // inverse order
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
@@ -443,6 +427,8 @@ pub struct Timeline {
pub(crate) extra_test_dense_keyspace: ArcSwap<KeySpace>,
pub(crate) l0_flush_global_state: L0FlushGlobalState,
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
}
pub struct WalReceiverInfo {
@@ -548,7 +534,6 @@ impl GetVectoredError {
}
}
#[derive(Debug)]
pub struct MissingKeyError {
key: Key,
shard: ShardNumber,
@@ -559,6 +544,12 @@ pub struct MissingKeyError {
backtrace: Option<std::backtrace::Backtrace>,
}
impl std::fmt::Debug for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl std::fmt::Display for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
@@ -1010,7 +1001,10 @@ impl Timeline {
.for_get_kind(GetKind::Singular)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing") && res.is_err() {
if cfg!(feature = "testing")
&& res.is_err()
&& !matches!(res, Err(PageReconstructError::Cancelled))
{
// it can only be walredo issue
use std::fmt::Write;
@@ -1929,6 +1923,9 @@ impl Timeline {
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// Ensure Prevent new page service requests from starting.
self.handles.shutdown();
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
self.remote_client.stop();
@@ -2454,6 +2451,8 @@ impl Timeline {
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
l0_flush_global_state: resources.l0_flush_global_state,
handles: Default::default(),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -3723,6 +3722,17 @@ impl Timeline {
&self.shard_identity
}
#[inline(always)]
pub(crate) fn shard_timeline_id(&self) -> ShardTimelineId {
ShardTimelineId {
shard_index: ShardIndex {
shard_number: self.shard_identity.number,
shard_count: self.shard_identity.count,
},
timeline_id: self.timeline_id,
}
}
///
/// Get a handle to the latest layer for appending.
///
@@ -5460,20 +5470,22 @@ impl Timeline {
} else {
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let img = match self
let res = self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await
.context("reconstruct a page image")
{
.await;
let img = match res {
Ok(img) => img,
Err(e) => return Err(PageReconstructError::WalRedo(e)),
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(e)) => {
return Err(PageReconstructError::WalRedo(
e.context("reconstruct a page image"),
))
}
};
Ok(img)
}
}

View File

@@ -15,6 +15,7 @@ use super::{
};
use anyhow::{anyhow, Context};
use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
@@ -30,8 +31,8 @@ use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPA
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState};
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
@@ -69,17 +70,21 @@ impl KeyHistoryRetention {
self,
key: Key,
delta_writer: &mut Vec<(Key, Lsn, Value)>,
image_writer: &mut ImageLayerWriter,
mut image_writer: Option<&mut ImageLayerWriter>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
for (_, KeyLogAtLsn(logs)) in self.below_horizon {
for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
if first_batch {
if logs.len() == 1 && logs[0].1.is_image() {
let Value::Image(img) = &logs[0].1 else {
unreachable!()
};
image_writer.put_image(key, img.clone(), ctx).await?;
if let Some(image_writer) = image_writer.as_mut() {
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
delta_writer.push((key, cutoff_lsn, Value::Image(img.clone())));
}
} else {
for (lsn, val) in logs {
delta_writer.push((key, lsn, val));
@@ -608,66 +613,230 @@ impl Timeline {
.read_lock_held_spawn_blocking_startup_micros
.till_now();
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// FIXME: should spawn_blocking the rest of this function
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
// TODO: replace with streaming k-merge
let all_keys = {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
all_keys
};
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
//
// A hole is a key range for which this compaction doesn't have any WAL records.
// Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
// cover the hole, but actually don't contain any WAL records for that key range.
// The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
// That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
//
// The algorithm chooses holes as follows.
// - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
// - Filter: min threshold on range length
// - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
//
// For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
#[derive(PartialEq, Eq)]
struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
let holes: Vec<Hole> = {
use std::cmp::Ordering;
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
self.coverage_size.cmp(&other.coverage_size).reverse()
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size =
layers.image_coverage(&key_range, last_record_lsn).len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
}
prev = Some(next_key.next());
}
prev = Some(next_key.next());
}
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
holes
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_rlock(guard);
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = all_keys.iter();
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
}
};
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = all_keys
@@ -738,12 +907,13 @@ impl Timeline {
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut next_hole = 0; // index of next hole in holes vector
for &DeltaEntry {
key, lsn, ref val, ..
} in all_values_iter
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.await
.map_err(CompactionError::Other)?
{
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -928,6 +1098,10 @@ impl Timeline {
}
}
// Without this, rustc complains about deltas_to_compact still
// being borrowed when we `.into_iter()` below.
drop(all_values_iter);
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: deltas_to_compact
@@ -1035,6 +1209,43 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}
impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}
impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
@@ -1122,6 +1333,7 @@ impl Timeline {
horizon: Lsn,
retain_lsn_below_horizon: &[Lsn],
delta_threshold_cnt: usize,
base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
) -> anyhow::Result<KeyHistoryRetention> {
// Pre-checks for the invariants
if cfg!(debug_assertions) {
@@ -1151,6 +1363,7 @@ impl Timeline {
);
}
}
let has_ancestor = base_img_from_ancestor.is_some();
// Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
// and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
let (mut split_history, lsn_split_points) = {
@@ -1184,6 +1397,9 @@ impl Timeline {
// For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
// keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
// dropped.
//
// TODO: in case we have both delta + images for a given LSN and it does not exceed the delta
// threshold, we could have kept delta instead to save space. This is an optimization for the future.
continue;
}
}
@@ -1201,9 +1417,13 @@ impl Timeline {
"should have at least below + above horizon batches"
);
let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
if let Some((key, lsn, img)) = base_img_from_ancestor {
replay_history.push((key, lsn, Value::Image(img)));
}
for (i, split_for_lsn) in split_history.into_iter().enumerate() {
// TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
records_since_last_image += split_for_lsn.len();
let generate_image = if i == 0 {
let generate_image = if i == 0 && !has_ancestor {
// We always generate images for the first batch (below horizon / lowest retain_lsn)
true
} else if i == batch_cnt - 1 {
@@ -1326,20 +1546,25 @@ impl Timeline {
retain_lsns_below_horizon.sort();
(selected_layers, gc_cutoff, retain_lsns_below_horizon)
};
let lowest_retain_lsn = retain_lsns_below_horizon
.first()
.copied()
.unwrap_or(gc_cutoff);
if cfg!(debug_assertions) {
assert_eq!(
lowest_retain_lsn,
retain_lsns_below_horizon
.iter()
.min()
.copied()
.unwrap_or(gc_cutoff)
);
}
let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
Lsn(self.ancestor_lsn.0 + 1)
} else {
let res = retain_lsns_below_horizon
.first()
.copied()
.unwrap_or(gc_cutoff);
if cfg!(debug_assertions) {
assert_eq!(
res,
retain_lsns_below_horizon
.iter()
.min()
.copied()
.unwrap_or(gc_cutoff)
);
}
res
};
info!(
"picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
layer_selection.len(),
@@ -1380,6 +1605,7 @@ impl Timeline {
let mut accumulated_values = Vec::new();
let mut last_key: Option<Key> = None;
#[allow(clippy::too_many_arguments)]
async fn flush_deltas(
deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
last_key: Key,
@@ -1388,6 +1614,7 @@ impl Timeline {
tline: &Arc<Timeline>,
lowest_retain_lsn: Lsn,
ctx: &RequestContext,
last_batch: bool,
) -> anyhow::Result<Option<ResidentLayer>> {
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
// overlapping layers.
@@ -1408,7 +1635,7 @@ impl Timeline {
*current_delta_split_point += 1;
need_split = true;
}
if !need_split {
if !need_split && !last_batch {
return Ok(None);
}
let deltas = std::mem::take(deltas);
@@ -1433,15 +1660,44 @@ impl Timeline {
Ok(Some(delta_layer))
}
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(Key::MIN..Key::MAX), // covers the full key range
lowest_retain_lsn,
ctx,
)
.await?;
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
// when some condition meet.
let mut image_layer_writer = if self.ancestor_timeline.is_none() {
Some(
ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
&(Key::MIN..Key::MAX), // covers the full key range
lowest_retain_lsn,
ctx,
)
.await?,
)
} else {
None
};
/// Returns None if there is no ancestor branch. Throw an error when the key is not found.
///
/// Currently, we always get the ancestor image for each key in the child branch no matter whether the image
/// is needed for reconstruction. This should be fixed in the future.
///
/// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor
/// images.
async fn get_ancestor_image(
tline: &Arc<Timeline>,
key: Key,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Bytes)>> {
if tline.ancestor_timeline.is_none() {
return Ok(None);
};
// This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing
// as much existing code as possible.
let img = tline.get(key, tline.ancestor_lsn, ctx).await?;
Ok(Some((key, tline.ancestor_lsn, img)))
}
let mut delta_values = Vec::new();
let delta_split_points = delta_split_points.into_iter().collect_vec();
@@ -1462,11 +1718,17 @@ impl Timeline {
gc_cutoff,
&retain_lsns_below_horizon,
COMPACTION_DELTA_THRESHOLD,
get_ancestor_image(self, *last_key, ctx).await?,
)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
retention
.pipe_to(*last_key, &mut delta_values, &mut image_layer_writer, ctx)
.pipe_to(
*last_key,
&mut delta_values,
image_layer_writer.as_mut(),
ctx,
)
.await?;
delta_layers.extend(
flush_deltas(
@@ -1477,6 +1739,7 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
false,
)
.await?,
);
@@ -1495,11 +1758,17 @@ impl Timeline {
gc_cutoff,
&retain_lsns_below_horizon,
COMPACTION_DELTA_THRESHOLD,
get_ancestor_image(self, last_key, ctx).await?,
)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
retention
.pipe_to(last_key, &mut delta_values, &mut image_layer_writer, ctx)
.pipe_to(
last_key,
&mut delta_values,
image_layer_writer.as_mut(),
ctx,
)
.await?;
delta_layers.extend(
flush_deltas(
@@ -1510,19 +1779,25 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
true,
)
.await?,
);
assert!(delta_values.is_empty(), "unprocessed keys");
let image_layer = image_layer_writer.finish(self, ctx).await?;
let image_layer = if let Some(writer) = image_layer_writer {
Some(writer.finish(self, ctx).await?)
} else {
None
};
info!(
"produced {} delta layers and {} image layers",
delta_layers.len(),
1
if image_layer.is_some() { 1 } else { 0 }
);
let mut compact_to = Vec::new();
compact_to.extend(delta_layers);
compact_to.push(image_layer);
compact_to.extend(image_layer);
// Step 3: Place back to the layer map.
{
let mut guard = self.layers.write().await;

View File

@@ -0,0 +1,967 @@
//! An efficient way to keep the timeline gate open without preventing
//! timeline shutdown for longer than a single call to a timeline method.
//!
//! # Motivation
//!
//! On a single page service connection, we're typically serving a single TenantTimelineId.
//!
//! Without sharding, there is a single Timeline object to which we dispatch
//! all requests. For example, a getpage request gets dispatched to the
//! Timeline::get method of the Timeline object that represents the
//! (tenant,timeline) of that connection.
//!
//! With sharding, for each request that comes in on the connection,
//! we first have to perform shard routing based on the requested key (=~ page number).
//! The result of shard routing is a Timeline object.
//! We then dispatch the request to that Timeline object.
//!
//! Regardless of whether the tenant is sharded or not, we want to ensure that
//! we hold the Timeline gate open while we're invoking the method on the
//! Timeline object.
//!
//! However, we want to avoid the overhead of entering the gate for every
//! method invocation.
//!
//! Further, for shard routing, we want to avoid calling the tenant manager to
//! resolve the shard for every request. Instead, we want to cache the
//! routing result so we can bypass the tenant manager for all subsequent requests
//! that get routed to that shard.
//!
//! Regardless of how we accomplish the above, it should not
//! prevent the Timeline from shutting down promptly.
//!
//! # Design
//!
//! There are three user-facing data structures:
//! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
//! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
//! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
//! Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request)
//!
//! The `Handle` is just a wrapper around an `Arc<HandleInner>`.
//!
//! There is one long-lived `Arc<HandleInner>`, which is stored in the `PerTimelineState`.
//! The `Cache` stores a `Weak<HandleInner>` for each cached Timeline.
//!
//! To dispatch a request, the page service connection calls `Cache::get`.
//!
//! A cache miss means we consult the tenant manager for shard routing,
//! resulting in an `Arc<Timeline>`. We enter its gate _once_ and construct an
//! `Arc<HandleInner>`. We store a `Weak<HandleInner>` in the cache
//! and the `Arc<HandleInner>` in the `PerTimelineState`.
//!
//! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
//! and find the `Weak<HandleInner>` in the cache.
//! We upgrade the `Weak<HandleInner>` to an `Arc<HandleInner>` and wrap it in the user-facing `Handle` type.
//!
//! The request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
//! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
//!
//! # Memory Management / How The Reference Cycle Is Broken
//!
//! The attentive reader may have noticed the strong reference cycle
//! from `Arc<HandleInner>` to `PerTimelineState` to `Arc<Timeline>`.
//!
//! This cycle is intentional: while it exists, the `Cache` can upgrade its
//! `Weak<HandleInner>` to an `Arc<HandleInner>` in a single atomic operation.
//!
//! The cycle is broken by either
//! - `PerTimelineState::shutdown` or
//! - dropping the `Cache`.
//!
//! Concurrently existing `Handle`s will extend the existence of the cycle.
//! However, since `Handle`s are short-lived and new `Handle`s are not
//! handed out after either `PerTimelineState::shutdown` or `Cache` drop,
//! that extension of the cycle is bounded.
//!
//! # Fast Path for Shard Routing
//!
//! The `Cache` has a fast path for shard routing to avoid calling into
//! the tenant manager for every request.
//!
//! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak<HandleInner>`.
//!
//! The current implementation uses the first entry in the hash map
//! to determine the `ShardParameters` and derive the correct
//! `ShardIndex` for the requested key.
//!
//! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
//!
//! If the lookup is successful and the `Weak<HandleInner>` can be upgraded,
//! it's a hit.
//!
//! ## Cache invalidation
//!
//! The insight is that cache invalidation is sufficient and most efficiently done lazily.
//! The only reasons why an entry in the cache can become stale are:
//! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
//! being detached, timeline or shard deleted, or pageserver is shutting down.
//! 2. We're doing a shard split and new traffic should be routed to the child shards.
//!
//! Regarding (1), we will eventually fail to upgrade the `Weak<HandleInner>` once the
//! timeline has shut down, and when that happens, we remove the entry from the cache.
//!
//! Regarding (2), the insight is that it is toally fine to keep dispatching requests
//! to the parent shard during a shard split. Eventually, the shard split task will
//! shut down the parent => case (1).
use std::collections::hash_map;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
use pageserver_api::shard::ShardIdentity;
use tracing::instrument;
use tracing::trace;
use utils::id::TimelineId;
use utils::shard::ShardIndex;
use utils::shard::ShardNumber;
use crate::tenant::mgr::ShardSelector;
/// The requirement for Debug is so that #[derive(Debug)] works in some places.
pub(crate) trait Types: Sized + std::fmt::Debug {
type TenantManagerError: Sized + std::fmt::Debug;
type TenantManager: TenantManager<Self> + Sized;
type Timeline: ArcTimeline<Self> + Sized;
}
/// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
/// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
/// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
struct CacheId(u64);
impl CacheId {
fn next() -> Self {
static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if id == 0 {
panic!("CacheId::new() returned 0, overflow");
}
Self(id)
}
}
/// See module-level comment.
pub(crate) struct Cache<T: Types> {
id: CacheId,
map: Map<T>,
}
type Map<T> = HashMap<ShardTimelineId, Weak<HandleInner<T>>>;
impl<T: Types> Default for Cache<T> {
fn default() -> Self {
Self {
id: CacheId::next(),
map: Default::default(),
}
}
}
#[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
pub(crate) struct ShardTimelineId {
pub(crate) shard_index: ShardIndex,
pub(crate) timeline_id: TimelineId,
}
/// See module-level comment.
pub(crate) struct Handle<T: Types>(Arc<HandleInner<T>>);
struct HandleInner<T: Types> {
shut_down: AtomicBool,
timeline: T::Timeline,
// The timeline's gate held open.
_gate_guard: utils::sync::gate::GateGuard,
}
/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
///
/// See module-level comment for details.
pub struct PerTimelineState<T: Types> {
// None = shutting down
handles: Mutex<Option<HashMap<CacheId, Arc<HandleInner<T>>>>>,
}
impl<T: Types> Default for PerTimelineState<T> {
fn default() -> Self {
Self {
handles: Mutex::new(Some(Default::default())),
}
}
}
/// Abstract view of [`crate::tenant::mgr`], for testability.
pub(crate) trait TenantManager<T: Types> {
/// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
/// Errors are returned as [`GetError::TenantManager`].
async fn resolve(
&self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
) -> Result<T::Timeline, T::TenantManagerError>;
}
/// Abstract view of an [`Arc<Timeline>`], for testability.
pub(crate) trait ArcTimeline<T: Types>: Clone {
fn gate(&self) -> &utils::sync::gate::Gate;
fn shard_timeline_id(&self) -> ShardTimelineId;
fn get_shard_identity(&self) -> &ShardIdentity;
fn per_timeline_state(&self) -> &PerTimelineState<T>;
}
/// Errors returned by [`Cache::get`].
#[derive(Debug)]
pub(crate) enum GetError<T: Types> {
TenantManager(T::TenantManagerError),
TimelineGateClosed,
PerTimelineStateShutDown,
}
/// Internal type used in [`Cache::get`].
enum RoutingResult<T: Types> {
FastPath(Handle<T>),
SlowPath(ShardTimelineId),
NeedConsultTenantManager,
}
impl<T: Types> Cache<T> {
/// See module-level comment for details.
///
/// Does NOT check for the shutdown state of [`Types::Timeline`].
/// Instead, the methods of [`Types::Timeline`] that are invoked through
/// the [`Handle`] are responsible for checking these conditions
/// and if so, return an error that causes the page service to
/// close the connection.
#[instrument(level = "trace", skip_all)]
pub(crate) async fn get(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
// terminates because each iteration removes an element from the map
loop {
let handle = self
.get_impl(timeline_id, shard_selector, tenant_manager)
.await?;
if handle.0.shut_down.load(Ordering::Relaxed) {
let removed = self
.map
.remove(&handle.0.timeline.shard_timeline_id())
.expect("invariant of get_impl is that the returned handle is in the map");
assert!(
Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)),
"shard_timeline_id() incorrect?"
);
} else {
return Ok(handle);
}
}
}
#[instrument(level = "trace", skip_all)]
async fn get_impl(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
let miss: ShardSelector = {
let routing_state = self.shard_routing(timeline_id, shard_selector);
match routing_state {
RoutingResult::FastPath(handle) => return Ok(handle),
RoutingResult::SlowPath(key) => match self.map.get(&key) {
Some(cached) => match cached.upgrade() {
Some(upgraded) => return Ok(Handle(upgraded)),
None => {
trace!("handle cache stale");
self.map.remove(&key).unwrap();
ShardSelector::Known(key.shard_index)
}
},
None => ShardSelector::Known(key.shard_index),
},
RoutingResult::NeedConsultTenantManager => shard_selector,
}
};
self.get_miss(timeline_id, miss, tenant_manager).await
}
#[inline(always)]
fn shard_routing(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
) -> RoutingResult<T> {
loop {
// terminates because when every iteration we remove an element from the map
let Some((first_key, first_handle)) = self.map.iter().next() else {
return RoutingResult::NeedConsultTenantManager;
};
let Some(first_handle) = first_handle.upgrade() else {
// TODO: dedup with get()
trace!("handle cache stale");
let first_key_owned = *first_key;
self.map.remove(&first_key_owned).unwrap();
continue;
};
let first_handle_shard_identity = first_handle.timeline.get_shard_identity();
let make_shard_index = |shard_num: ShardNumber| ShardIndex {
shard_number: shard_num,
shard_count: first_handle_shard_identity.count,
};
let need_idx = match shard_selector {
ShardSelector::Page(key) => {
make_shard_index(first_handle_shard_identity.get_shard_number(&key))
}
ShardSelector::Zero => make_shard_index(ShardNumber(0)),
ShardSelector::Known(shard_idx) => shard_idx,
};
let need_shard_timeline_id = ShardTimelineId {
shard_index: need_idx,
timeline_id,
};
let first_handle_shard_timeline_id = ShardTimelineId {
shard_index: first_handle_shard_identity.shard_index(),
timeline_id: first_handle.timeline.shard_timeline_id().timeline_id,
};
if need_shard_timeline_id == first_handle_shard_timeline_id {
return RoutingResult::FastPath(Handle(first_handle));
} else {
return RoutingResult::SlowPath(need_shard_timeline_id);
}
}
}
#[instrument(level = "trace", skip_all)]
#[inline(always)]
async fn get_miss(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
match tenant_manager.resolve(timeline_id, shard_selector).await {
Ok(timeline) => {
let key = timeline.shard_timeline_id();
match &shard_selector {
ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
ShardSelector::Page(_) => (), // gotta trust tenant_manager
ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
}
let gate_guard = match timeline.gate().enter() {
Ok(guard) => guard,
Err(_) => {
return Err(GetError::TimelineGateClosed);
}
};
trace!("creating new HandleInner");
let handle = Arc::new(
// TODO: global metric that keeps track of the number of live HandlerTimeline instances
// so we can identify reference cycle bugs.
HandleInner {
shut_down: AtomicBool::new(false),
_gate_guard: gate_guard,
timeline: timeline.clone(),
},
);
let handle = {
let mut lock_guard = timeline
.per_timeline_state()
.handles
.lock()
.expect("mutex poisoned");
match &mut *lock_guard {
Some(per_timeline_state) => {
let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle));
assert!(replaced.is_none(), "some earlier code left a stale handle");
match self.map.entry(key) {
hash_map::Entry::Occupied(_o) => {
// This cannot not happen because
// 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
// 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
// while we were waiting for the tenant manager.
unreachable!()
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::downgrade(&handle));
handle
}
}
}
None => {
return Err(GetError::PerTimelineStateShutDown);
}
}
};
Ok(Handle(handle))
}
Err(e) => Err(GetError::TenantManager(e)),
}
}
}
impl<T: Types> PerTimelineState<T> {
/// After this method returns, [`Cache::get`] will never again return a [`Handle`]
/// to the [`Types::Timeline`] that embeds this per-timeline state.
/// Even if [`TenantManager::resolve`] would still resolve to it.
///
/// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
/// That's ok because they're short-lived. See module-level comment for details.
#[instrument(level = "trace", skip_all)]
pub(super) fn shutdown(&self) {
let handles = self
.handles
.lock()
.expect("mutex poisoned")
// NB: this .take() sets locked to None.
// That's what makes future `Cache::get` misses fail.
// Cache hits are taken care of below.
.take();
let Some(handles) = handles else {
trace!("already shut down");
return;
};
for handle in handles.values() {
// Make hits fail.
handle.shut_down.store(true, Ordering::Relaxed);
}
drop(handles);
}
}
impl<T: Types> std::ops::Deref for Handle<T> {
type Target = T::Timeline;
fn deref(&self) -> &Self::Target {
&self.0.timeline
}
}
#[cfg(test)]
impl<T: Types> Drop for HandleInner<T> {
fn drop(&mut self) {
trace!("HandleInner dropped");
}
}
// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
impl<T: Types> Drop for Cache<T> {
fn drop(&mut self) {
for (_, weak) in self.map.drain() {
if let Some(strong) = weak.upgrade() {
// handle is still being kept alive in PerTimelineState
let timeline = strong.timeline.per_timeline_state();
let mut handles = timeline.handles.lock().expect("mutex poisoned");
if let Some(handles) = &mut *handles {
let Some(removed) = handles.remove(&self.id) else {
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
continue;
};
assert!(Arc::ptr_eq(&removed, &strong));
}
}
}
}
}
#[cfg(test)]
mod tests {
use pageserver_api::{
key::{rel_block_to_key, Key, DBDIR_KEY},
models::ShardParameters,
reltag::RelTag,
shard::ShardStripeSize,
};
use utils::shard::ShardCount;
use super::*;
const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
#[derive(Debug)]
struct TestTypes;
impl Types for TestTypes {
type TenantManagerError = anyhow::Error;
type TenantManager = StubManager;
type Timeline = Arc<StubTimeline>;
}
struct StubManager {
shards: Vec<Arc<StubTimeline>>,
}
struct StubTimeline {
gate: utils::sync::gate::Gate,
id: TimelineId,
shard: ShardIdentity,
per_timeline_state: PerTimelineState<TestTypes>,
myself: Weak<StubTimeline>,
}
impl StubTimeline {
fn getpage(&self) {
// do nothing
}
}
impl ArcTimeline<TestTypes> for Arc<StubTimeline> {
fn gate(&self) -> &utils::sync::gate::Gate {
&self.gate
}
fn shard_timeline_id(&self) -> ShardTimelineId {
ShardTimelineId {
shard_index: self.shard.shard_index(),
timeline_id: self.id,
}
}
fn get_shard_identity(&self) -> &ShardIdentity {
&self.shard
}
fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
&self.per_timeline_state
}
}
impl TenantManager<TestTypes> for StubManager {
async fn resolve(
&self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
) -> anyhow::Result<Arc<StubTimeline>> {
for timeline in &self.shards {
if timeline.id == timeline_id {
match &shard_selector {
ShardSelector::Zero if timeline.shard.is_shard_zero() => {
return Ok(Arc::clone(timeline));
}
ShardSelector::Zero => continue,
ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
return Ok(Arc::clone(timeline));
}
ShardSelector::Page(_) => continue,
ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
return Ok(Arc::clone(timeline));
}
ShardSelector::Known(_) => continue,
}
}
}
anyhow::bail!("not found")
}
}
#[tokio::test(start_paused = true)]
async fn test_timeline_shutdown() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let shard0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mgr = StubManager {
shards: vec![shard0.clone()],
};
let key = DBDIR_KEY;
let mut cache = Cache::<TestTypes>::default();
//
// fill the cache
//
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(2, 1),
"strong: shard0, mgr; weak: myself"
);
let handle: Handle<_> = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
let handle_inner_weak = Arc::downgrade(&handle.0);
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
assert_eq!(
(
Weak::strong_count(&handle_inner_weak),
Weak::weak_count(&handle_inner_weak)
),
(2, 2),
"strong: handle, per_timeline_state, weak: handle_inner_weak, cache"
);
assert_eq!(cache.map.len(), 1);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
);
drop(handle);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
);
//
// demonstrate that Handle holds up gate closure
// but shutdown prevents new handles from being handed out
//
tokio::select! {
_ = shard0.gate.close() => {
panic!("cache and per-timeline handler state keep cache open");
}
_ = tokio::time::sleep(FOREVER) => {
// NB: first poll of close() makes it enter closing state
}
}
let handle = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
// SHUTDOWN
shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
assert_eq!(
1,
Weak::strong_count(&handle_inner_weak),
"through local var handle"
);
assert_eq!(
cache.map.len(),
1,
"this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(3, 1),
"strong: handleinner(via handle), shard0, mgr; weak: myself"
);
// this handle is perfectly usable
handle.getpage();
cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.err()
.expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
assert_eq!(
cache.map.len(),
0,
"first access after shutdown cleans up the Weak's from the cache"
);
tokio::select! {
_ = shard0.gate.close() => {
panic!("handle is keeping gate open");
}
_ = tokio::time::sleep(FOREVER) => { }
}
drop(handle);
assert_eq!(
0,
Weak::strong_count(&handle_inner_weak),
"the HandleInner destructor already ran"
);
assert_eq!(
(Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
(2, 1),
"strong: shard0, mgr; weak: myself"
);
// closing gate succeeds after dropping handle
tokio::select! {
_ = shard0.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("handle is dropped, no other gate holders exist")
}
}
// map gets cleaned on next lookup
cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.err()
.expect("documented behavior: can't get new handle after shutdown");
assert_eq!(cache.map.len(), 0);
// ensure all refs to shard0 are gone and we're not leaking anything
let myself = Weak::clone(&shard0.myself);
drop(shard0);
drop(mgr);
assert_eq!(Weak::strong_count(&myself), 0);
}
#[tokio::test]
async fn test_multiple_timelines_and_deletion() {
crate::tenant::harness::setup_logging();
let timeline_a = TimelineId::generate();
let timeline_b = TimelineId::generate();
assert_ne!(timeline_a, timeline_b);
let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_a,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_b,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mut mgr = StubManager {
shards: vec![timeline_a.clone(), timeline_b.clone()],
};
let key = DBDIR_KEY;
let mut cache = Cache::<TestTypes>::default();
cache
.get(timeline_a.id, ShardSelector::Page(key), &mgr)
.await
.expect("we have it");
cache
.get(timeline_b.id, ShardSelector::Page(key), &mgr)
.await
.expect("we have it");
assert_eq!(cache.map.len(), 2);
// delete timeline A
timeline_a.per_timeline_state.shutdown();
mgr.shards.retain(|t| t.id != timeline_a.id);
assert!(
mgr.resolve(timeline_a.id, ShardSelector::Page(key))
.await
.is_err(),
"broken StubManager implementation"
);
assert_eq!(
cache.map.len(),
2,
"cache still has a Weak handle to Timeline A"
);
cache
.get(timeline_a.id, ShardSelector::Page(key), &mgr)
.await
.err()
.expect("documented behavior: can't get new handle after shutdown");
assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
cache
.get(timeline_b.id, ShardSelector::Page(key), &mgr)
.await
.expect("we still have it");
}
fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
rel_block_to_key(
RelTag {
spcnode: 1663,
dbnode: 208101,
relnode: 2620,
forknum: 0,
},
shard.0 as u32 * params.stripe_size.0,
)
}
#[tokio::test(start_paused = true)]
async fn test_shard_split() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let parent = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let child_params = ShardParameters {
count: ShardCount(2),
stripe_size: ShardStripeSize::default(),
};
let child0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let child1 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let child_shards_by_shard_number = [child0.clone(), child1.clone()];
let mut cache = Cache::<TestTypes>::default();
// fill the cache with the parent
for i in 0..2 {
let handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
&StubManager {
shards: vec![parent.clone()],
},
)
.await
.expect("we have it");
assert!(
Weak::ptr_eq(&handle.myself, &parent.myself),
"mgr returns parent first"
);
drop(handle);
}
//
// SHARD SPLIT: tenant manager changes, but the cache isn't informed
//
// while we haven't shut down the parent, the cache will return the cached parent, even
// if the tenant manager returns the child
for i in 0..2 {
let handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
&StubManager {
shards: vec![], // doesn't matter what's in here, the cache is fully loaded
},
)
.await
.expect("we have it");
assert!(
Weak::ptr_eq(&handle.myself, &parent.myself),
"mgr returns parent"
);
drop(handle);
}
let parent_handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
&StubManager {
shards: vec![parent.clone()],
},
)
.await
.expect("we have it");
assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
// invalidate the cache
parent.per_timeline_state.shutdown();
// the cache will now return the child, even though the parent handle still exists
for i in 0..2 {
let handle = cache
.get(
timeline_id,
ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
&StubManager {
shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
},
)
.await
.expect("we have it");
assert!(
Weak::ptr_eq(
&handle.myself,
&child_shards_by_shard_number[i as usize].myself
),
"mgr returns child"
);
drop(handle);
}
// all the while the parent handle kept the parent gate open
tokio::select! {
_ = parent_handle.gate.close() => {
panic!("parent handle is keeping gate open");
}
_ = tokio::time::sleep(FOREVER) => { }
}
drop(parent_handle);
tokio::select! {
_ = parent.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("parent handle is dropped, no other gate holders exist")
}
}
}
#[tokio::test(start_paused = true)]
async fn test_connection_handler_exit() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let shard0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mgr = StubManager {
shards: vec![shard0.clone()],
};
let key = DBDIR_KEY;
// Simulate 10 connections that's opened, used, and closed
let mut used_handles = vec![];
for _ in 0..10 {
let mut cache = Cache::<TestTypes>::default();
let handle = {
let handle = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
handle
};
handle.getpage();
used_handles.push(Arc::downgrade(&handle.0));
}
// No handles exist, thus gates are closed and don't require shutdown
assert!(used_handles
.iter()
.all(|weak| Weak::strong_count(weak) == 0));
// ... thus the gate should close immediately, even without shutdown
tokio::select! {
_ = shard0.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("handle is dropped, no other gate holders exist")
}
}
}
}

View File

@@ -241,6 +241,9 @@ impl PostgresRedoManager {
/// Shut down the WAL redo manager.
///
/// Returns `true` if this call was the one that initiated shutdown.
/// `true` may be observed by no caller if the first caller stops polling.
///
/// After this future completes
/// - no redo process is running
/// - no new redo process will be spawned
@@ -250,22 +253,32 @@ impl PostgresRedoManager {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn shutdown(&self) {
pub async fn shutdown(&self) -> bool {
// prevent new processes from being spawned
let permit = match self.redo_process.get_or_init_detached().await {
let maybe_permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
permit
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
}
}
Err(permit) => permit,
Err(permit) => Some(permit),
};
let it_was_us = if let Some(permit) = maybe_permit {
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
// for the underlying process.
self.launched_processes.close().await;
it_was_us
}
/// This type doesn't have its own background task to check for idleness: we

View File

@@ -67,6 +67,7 @@ FALLBACK_DURATION = {
"test_runner/performance/test_copy.py::test_copy[neon]": 13.817,
"test_runner/performance/test_copy.py::test_copy[vanilla]": 11.736,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 575.735,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback_with_snapshots": 575.735,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.868,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 14.393,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 20.588,

View File

@@ -656,11 +656,8 @@ impl Reconciler {
// reconcile this location. This includes locations with different configurations, as well
// as locations with unknown (None) observed state.
// The general case is to increment the generation. However, there are cases
// where this is not necessary:
// - if we are only updating the TenantConf part of the location
// - if we are only changing the attachment mode (e.g. going to attachedmulti or attachedstale)
// and the location was already in the correct generation
// Incrementing generation is the safe general case, but is inefficient for changes that only
// modify some details (e.g. the tenant's config).
let increment_generation = match observed {
None => true,
Some(ObservedStateLocation { conf: None }) => true,
@@ -669,18 +666,11 @@ impl Reconciler {
}) => {
let generations_match = observed.generation == wanted_conf.generation;
use LocationConfigMode::*;
let mode_transition_requires_gen_inc =
match (observed.mode, wanted_conf.mode) {
// Usually the short-lived attachment modes (multi and stale) are only used
// in the case of [`Self::live_migrate`], but it is simple to handle them correctly
// here too. Locations are allowed to go Single->Stale and Multi->Single within the same generation.
(AttachedSingle, AttachedStale) => false,
(AttachedMulti, AttachedSingle) => false,
(lhs, rhs) => lhs != rhs,
};
!generations_match || mode_transition_requires_gen_inc
// We may skip incrementing the generation if the location is already in the expected mode and
// generation. In principle it would also be safe to skip from certain other modes (e.g. AttachedStale),
// but such states are handled inside `live_migrate`, and if we see that state here we're cleaning up
// after a restart/crash, so fall back to the universally safe path of incrementing generation.
!generations_match || (observed.mode != wanted_conf.mode)
}
};

View File

@@ -40,6 +40,11 @@ impl TimelineAnalysis {
garbage_keys: Vec::new(),
}
}
/// Whether a timeline is healthy.
pub(crate) fn is_healthy(&self) -> bool {
self.errors.is_empty() && self.warnings.is_empty()
}
}
pub(crate) async fn branch_cleanup_and_check_errors(

View File

@@ -19,8 +19,8 @@ use utils::id::TenantId;
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote, init_remote_generic, list_objects_with_retries,
metadata_stream::{stream_tenant_timelines, stream_tenants},
init_remote_generic, list_objects_with_retries_generic,
metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
};
@@ -153,7 +153,7 @@ async fn find_garbage_inner(
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
@@ -179,7 +179,7 @@ async fn find_garbage_inner(
// Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
let tenants = stream_tenants(&s3_client, &target);
let tenants = stream_tenants_generic(&remote_client, &target);
let tenants_checked = tenants.map_ok(|t| {
let api_client = cloud_admin_api_client.clone();
let console_cache = console_cache.clone();
@@ -237,25 +237,26 @@ async fn find_garbage_inner(
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
// identify it as purge-able anyway
if console_result.is_none() {
let timelines = stream_tenant_timelines(&s3_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
let timelines =
stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
if timelines.is_empty() {
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
let tenant_objects = list_objects_with_retries(
&s3_client,
let tenant_objects = list_objects_with_retries_generic(
&remote_client,
ListingMode::WithDelimiter,
&target.tenant_root(&tenant_shard_id),
None,
)
.await?;
let object = tenant_objects.contents.as_ref().unwrap().first().unwrap();
if object.key.as_ref().unwrap().ends_with("heatmap-v1.json") {
let object = tenant_objects.keys.first().unwrap();
if object.key.get_path().as_str().ends_with("heatmap-v1.json") {
tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)");
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
continue;
} else {
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key.as_ref().unwrap());
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key);
}
} else {
// A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
@@ -264,24 +265,18 @@ async fn find_garbage_inner(
for timeline_r in timelines {
let timeline = timeline_r?;
let timeline_objects = list_objects_with_retries(
&s3_client,
let timeline_objects = list_objects_with_retries_generic(
&remote_client,
ListingMode::WithDelimiter,
&target.timeline_root(&timeline),
None,
)
.await?;
if timeline_objects
.common_prefixes
.as_ref()
.map(|v| v.len())
.unwrap_or(0)
> 0
{
if !timeline_objects.prefixes.is_empty() {
// Sub-paths? Unexpected
any_non_initdb = true;
} else {
let object = timeline_objects.contents.as_ref().unwrap().first().unwrap();
if object.key.as_ref().unwrap().ends_with("initdb.tar.zst") {
let object = timeline_objects.keys.first().unwrap();
if object.key.get_path().as_str().ends_with("initdb.tar.zst") {
tracing::info!("Timeline {timeline} contains only initdb.tar.zst");
} else {
any_non_initdb = true;
@@ -336,7 +331,8 @@ async fn find_garbage_inner(
// Construct a stream of all timelines within active tenants
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
let timelines =
active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t));
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
let timelines = timelines.try_flatten();

View File

@@ -32,6 +32,7 @@ use remote_storage::{
};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use storage_controller_client::control_api;
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -255,6 +256,12 @@ pub struct ControllerClientConfig {
pub controller_jwt: String,
}
impl ControllerClientConfig {
pub fn build_client(self) -> control_api::Client {
control_api::Client::new(self.controller_api, Some(self.controller_jwt))
}
}
pub struct ConsoleConfig {
pub token: String,
pub base_url: Url,
@@ -420,6 +427,7 @@ async fn list_objects_with_retries(
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
}
/// Listing possibly large amounts of keys in a streaming fashion.
fn stream_objects_with_retries<'a>(
storage_client: &'a GenericRemoteStorage,
listing_mode: ListingMode,
@@ -458,6 +466,45 @@ fn stream_objects_with_retries<'a>(
}
}
/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
/// use [`stream_objects_with_retries`] instead.
async fn list_objects_with_retries_generic(
remote_client: &GenericRemoteStorage,
listing_mode: ListingMode,
s3_target: &S3Target,
) -> anyhow::Result<Listing> {
let cancel = CancellationToken::new();
let prefix_str = &s3_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&s3_target.prefix_in_bucket);
let prefix = RemotePath::from_string(prefix_str)?;
for trial in 0..MAX_RETRIES {
match remote_client
.list(Some(&prefix), listing_mode, None, &cancel)
.await
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
);
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
}
}
}
panic!("MAX_RETRIES is not allowed to be 0");
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,

View File

@@ -1,7 +1,8 @@
use anyhow::{anyhow, bail};
use camino::Utf8PathBuf;
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
use pageserver_api::shard::TenantShardId;
use reqwest::Url;
use reqwest::{Method, Url};
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
@@ -61,6 +62,8 @@ enum Command {
json: bool,
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
#[arg(long = "post", default_value_t = false)]
post_to_storage_controller: bool,
#[arg(long, default_value = None)]
/// For safekeeper node_kind only, points to db with debug dump
dump_db_connstr: Option<String>,
@@ -116,11 +119,20 @@ async fn main() -> anyhow::Result<()> {
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
));
let controller_client_conf = cli.controller_api.map(|controller_api| {
ControllerClientConfig {
controller_api,
// Default to no key: this is a convenience when working in a development environment
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
}
});
match cli.command {
Command::ScanMetadata {
json,
tenant_ids,
node_kind,
post_to_storage_controller,
dump_db_connstr,
dump_db_table,
} => {
@@ -159,6 +171,9 @@ async fn main() -> anyhow::Result<()> {
}
Ok(())
} else {
if controller_client_conf.is_none() && post_to_storage_controller {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_metadata(bucket_config.clone(), tenant_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
@@ -170,6 +185,21 @@ async fn main() -> anyhow::Result<()> {
} else {
println!("{}", summary.summary_string());
}
if post_to_storage_controller {
if let Some(conf) = controller_client_conf {
let controller_client = conf.build_client();
let body = summary.build_health_update_request();
controller_client
.dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
Method::POST,
"control/v1/metadata_health/update".to_string(),
Some(body),
)
.await?;
}
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
@@ -217,14 +247,6 @@ async fn main() -> anyhow::Result<()> {
min_age,
mode,
} => {
let controller_client_conf = cli.controller_api.map(|controller_api| {
ControllerClientConfig {
controller_api,
// Default to no key: this is a convenience when working in a development environment
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
}
});
match (&controller_client_conf, mode) {
(Some(_), _) => {
// Any mode may run when controller API is set

View File

@@ -189,6 +189,63 @@ pub async fn stream_tenant_timelines<'a>(
})
}
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
/// using a listing. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let timelines_target = target.timelines_root(&tenant);
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
remote_client,
ListingMode::WithDelimiter,
&timelines_target
));
loop {
tracing::debug!("Listing in {tenant}");
let fetch_response = match objects_stream.next().await {
None => break,
Some(Err(e)) => {
timeline_ids.push(Err(e));
break;
}
Some(Ok(r)) => r,
};
let new_entry_ids = fetch_response
.prefixes
.iter()
.filter_map(|prefix| -> Option<&str> {
prefix
.get_path()
.as_str()
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse::<TimelineId>()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
timeline_ids.push(i);
}
}
tracing::debug!("Yielding for {}", tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
yield Ok(TenantShardTimelineId::new(tenant, id));
}
})
}
pub(crate) fn stream_listing<'a>(
s3_client: &'a Client,
target: &'a S3Target,

View File

@@ -567,13 +567,7 @@ pub async fn pageserver_physical_gc(
}
// Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
let Some(controller_client) = controller_client_conf.as_ref().map(|c| {
let ControllerClientConfig {
controller_api,
controller_jwt,
} = c;
control_api::Client::new(controller_api.clone(), Some(controller_jwt.clone()))
}) else {
let Some(controller_client) = controller_client_conf.map(|c| c.build_client()) else {
tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
return Ok(summary);
};

View File

@@ -9,12 +9,13 @@ use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimeline
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use utils::id::TenantId;
use utils::shard::ShardCount;
#[derive(Serialize)]
#[derive(Serialize, Default)]
pub struct MetadataSummary {
tenant_count: usize,
timeline_count: usize,
@@ -23,19 +24,16 @@ pub struct MetadataSummary {
with_warnings: HashSet<TenantShardTimelineId>,
with_orphans: HashSet<TenantShardTimelineId>,
indices_by_version: HashMap<usize, usize>,
#[serde(skip)]
pub(crate) healthy_tenant_shards: HashSet<TenantShardId>,
#[serde(skip)]
pub(crate) unhealthy_tenant_shards: HashSet<TenantShardId>,
}
impl MetadataSummary {
fn new() -> Self {
Self {
tenant_count: 0,
timeline_count: 0,
timeline_shard_count: 0,
with_errors: HashSet::new(),
with_warnings: HashSet::new(),
with_orphans: HashSet::new(),
indices_by_version: HashMap::new(),
}
Self::default()
}
fn update_data(&mut self, data: &S3TimelineBlobData) {
@@ -54,6 +52,13 @@ impl MetadataSummary {
}
fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
if analysis.is_healthy() {
self.healthy_tenant_shards.insert(id.tenant_shard_id);
} else {
self.healthy_tenant_shards.remove(&id.tenant_shard_id);
self.unhealthy_tenant_shards.insert(id.tenant_shard_id);
}
if !analysis.errors.is_empty() {
self.with_errors.insert(*id);
}
@@ -101,6 +106,13 @@ Index versions: {version_summary}
pub fn is_empty(&self) -> bool {
self.timeline_shard_count == 0
}
pub fn build_health_update_request(&self) -> MetadataHealthUpdateRequest {
MetadataHealthUpdateRequest {
healthy_tenant_shards: self.healthy_tenant_shards.clone(),
unhealthy_tenant_shards: self.unhealthy_tenant_shards.clone(),
}
}
}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.

View File

@@ -4401,10 +4401,11 @@ class StorageScrubber:
assert stdout is not None
return stdout
def scan_metadata(self) -> Any:
stdout = self.scrubber_cli(
["scan-metadata", "--node-kind", "pageserver", "--json"], timeout=30
)
def scan_metadata(self, post_to_storage_controller: bool = False) -> Any:
args = ["scan-metadata", "--node-kind", "pageserver", "--json"]
if post_to_storage_controller:
args.append("--post")
stdout = self.scrubber_cli(args, timeout=30)
try:
return json.loads(stdout)

View File

@@ -6,21 +6,8 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
NB: this test demonstrates the problem. The source tree contained the
`gc_feedback` mechanism for about 9 months, but, there were problems
with it and it wasn't enabled at runtime.
This PR removed the code: https://github.com/neondatabase/neon/pull/6863
"""
def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, mode: str):
assert mode == "normal" or mode == "with_snapshots"
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
@@ -74,6 +61,9 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
if mode == "with_snapshots":
if step == n_steps / 2:
env.neon_cli.create_branch("child")
max_num_of_deltas_above_image = 0
max_total_num_of_deltas = 0
@@ -149,3 +139,37 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
log.info(f"Writing layer map to {layer_map_path}")
with layer_map_path.open("w") as f:
f.write(json.dumps(client.timeline_layer_map_info(tenant_id, timeline_id)))
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
NB: this test demonstrates the problem. The source tree contained the
`gc_feedback` mechanism for about 9 months, but, there were problems
with it and it wasn't enabled at runtime.
This PR removed the code: https://github.com/neondatabase/neon/pull/6863
And the bottom-most GC-compaction epic resolves the problem.
https://github.com/neondatabase/neon/issues/8002
"""
gc_feedback_impl(neon_env_builder, zenbenchmark, "normal")
@pytest.mark.timeout(10000)
def test_gc_feedback_with_snapshots(
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
):
"""
Compared with `test_gc_feedback`, we create a branch without written data (=snapshot) in the middle
of the benchmark, and the bottom-most compaction should collect as much garbage as possible below the GC
horizon. Ideally, there should be images (in an image layer) covering the full range at the branch point,
and images covering the full key range (in a delta layer) at the GC horizon.
"""
gc_feedback_impl(neon_env_builder, zenbenchmark, "with_snapshots")

View File

@@ -440,10 +440,12 @@ def test_scrubber_scan_pageserver_metadata(
assert len(index.layer_metadata) > 0
it = iter(index.layer_metadata.items())
scan_summary = env.storage_scrubber.scan_metadata()
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
assert not scan_summary["with_warnings"]
assert not scan_summary["with_errors"]
assert env.storage_controller.metadata_health_is_healthy()
# Delete a layer file that is listed in the index.
layer, metadata = next(it)
log.info(f"Deleting {timeline_path}/{layer.to_str()}")
@@ -453,7 +455,17 @@ def test_scrubber_scan_pageserver_metadata(
)
log.info(f"delete response: {delete_response}")
# Check scan summary. Expect it to be a L0 layer so only emit warnings.
# Check scan summary without posting to storage controller. Expect it to be a L0 layer so only emit warnings.
scan_summary = env.storage_scrubber.scan_metadata()
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
assert env.storage_controller.metadata_health_is_healthy()
# Now post to storage controller, expect seeing one unhealthy health record
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)

View File

@@ -277,8 +277,12 @@ files:
help: 'Bytes between received and replayed LSN'
key_labels:
values: [replication_delay_bytes]
# We use a GREATEST call here because this calculation can be negative.
# The calculation is not atomic, meaning after we've gotten the receive
# LSN, the replay LSN may have advanced past the receive LSN we
# are using for the calculation.
query: |
SELECT pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn()) AS replication_delay_bytes;
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;
- metric_name: replication_delay_seconds
type: gauge