Compare commits

..

37 Commits

Author SHA1 Message Date
Konstantin Knizhnik
8852e8a766 Fix prewarming terminatoin condition 2024-10-27 14:14:06 +02:00
Konstantin Knizhnik
c04ae556c5 Fix comments and other minor refactoring 2024-10-27 09:08:40 +02:00
Konstantin Knizhnik
44b283e107 Increase timeout in test_lfc_prewarm 2024-10-26 08:22:53 +03:00
Konstantin Knizhnik
c7a3359edd Add prewarm_local_cache and get_local_cache_state functions 2024-10-25 23:05:07 +03:00
Konstantin Knizhnik
012a8a360f Fix get_prewarm_info() 2024-10-25 10:11:58 +03:00
Konstantin Knizhnik
0b42695983 Report prewarm progress 2024-10-25 08:26:02 +03:00
Konstantin Knizhnik
284d7b4da6 Report prewarm progress 2024-10-25 08:26:00 +03:00
Konstantin Knizhnik
361fc04cd6 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
12f635aa04 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
ac6c53b94b Support prewarming of replica 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
df289738b8 Explain why we do not want to perform prewarm of LFC at replica and how it is avoided now 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
0d3503a187 Check for number of used pages rather than chunks in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f328d497e1 Wait LFC prewarm completion in the loop in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f971c3a786 Implement LFC prewarm 2024-10-25 08:24:44 +03:00
Vlad Lazar
5069123b6d pageserver: refactor ingest inplace to decouple decoding and handling (#9472)
## Problem

WAL ingest couples decoding of special records with their handling
(updates to the storage engine mostly).
This is a roadblock for our plan to move WAL filtering (and implicitly
decoding) to safekeepers since they cannot
do writes to the storage engine. 

## Summary of changes

This PR decouples the decoding of the special WAL records from their
application. The changes are done in place
and I've done my best to refrain from refactorings and attempted to
preserve the original code as much as possible.

Related: https://github.com/neondatabase/neon/issues/9335
Epic: https://github.com/neondatabase/neon/issues/9329
2024-10-24 17:12:47 +01:00
Alex Chi Z.
fb0406e9d2 refactor(pageserver): refactor split writers using batch layer writer (#9493)
part of https://github.com/neondatabase/neon/issues/9114,
https://github.com/neondatabase/neon/issues/8836,
https://github.com/neondatabase/neon/issues/8362

The split layer writer code can be used in a more general way: the
caller puts unfinished writers into the batch layer writer and let batch
layer writer to ensure the atomicity of the layer produces.

## Summary of changes

* Add batch layer writer, which atomically finishes the layers.
`BatchLayerWriter::finish` is simply a copy-paste from previous split
layer writers.
* Refactor split writers to use the batch layer writer.
* The current split writer tests cover all code path of batch layer
writer.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-10-24 10:49:54 -04:00
Alexander Bayandin
b8a311131e CI: remove git config --add safe.directory hack (#9391)
## Problem

We have `git config --global --add safe.directory ...` leftovers from the
past, but `actions/checkout` does it by default (since v3.0.2, we use v4)

## Summary of changes
- Remove `git config --global --add safe.directory ...` hack
2024-10-24 15:49:26 +01:00
John Spray
d589498c6f storcon: respect Reconciler::cancel during await_lsn (#9486)
## Problem

When a pageserver is misbehaving (e.g. we hit an ingest bug or something
is pathologically slow), the storage controller could get stuck in the
part of live migration that waits for LSNs to catch up. This is a
problem, because it can prevent us migrating the troublesome tenant to
another pageserver.

Closes: https://github.com/neondatabase/cloud/issues/19169

## Summary of changes

- Respect Reconciler::cancel during await_lsn.
2024-10-24 15:23:09 +01:00
Christian Schwarz
6f34f97573 refactor(pageserver(load_remote_timeline)) remove dead code handling absence of IndexPart (#9408)
The code is dead at runtime since we're nowadays always running with
remote storage and treat it as the source of truth during attach.

Clean it up as a preliminary to
https://github.com/neondatabase/neon/pull/9218.

Related: https://github.com/neondatabase/neon/pull/9366
2024-10-24 09:00:22 +01:00
Tristan Partin
b86432c29e Fix buggy sizeof
A sizeof on a pointer on a 64 bit machine is 8 bytes whereas
Entry::old_name is a 64 byte array of characters. There was most likely
no fallout since the string would start with NUL bytes, but best to fix
nonetheless.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-23 21:52:22 -06:00
Vlad Lazar
ac1205c14c pageserver: add metric for number of zeroed pages on rel extend (#9492)
## Problem

Filling the gap in with zeroes is annoying for sharded ingest. We are
not sure it even happens in reality.

## Summary of Changes

Add one global counter which tracks how many such gap blocks we filled
on relation extends. We can add more metrics once we understand the
scope.
2024-10-23 19:58:28 +01:00
John Spray
e3ff87ce3b tests: avoid using background_process when invoking pg_ctl (#9469)
## Problem

Occasionally, we get failures to start the storage controller's db with
errors like:
```
aborting due to panic at /__w/neon/neon/control_plane/src/background_process.rs:349:67:
claim pid file: lock file

Caused by:
    file is already locked
```
e.g.
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-9428/11380574562/index.html#/testresult/1c68d413ea9ecd4a

This is happening in a stop,start cycle during a test. Presumably the
pidfile from the startup background process is still held at the point
we stop, because we let pg_ctl keep running in the background.

## Summary of changes

- Refactor pg_ctl invocations into a helper
- In the controller's `start` function, use pg_ctl & a wait loop for
pg_isready, instead of using background_process

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-10-23 16:29:55 +00:00
Tristan Partin
0595320c87 Protect call to pg_current_wal_lsn() in retained_wal query
We can't call pg_current_wal_lsn() if we are a standby instance (read
replica). Any attempt to call this function while in recovery results
in:

ERROR:  recovery is in progress

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-23 09:55:00 -06:00
Folke Behrens
92d5e0e87a proxy: clear lib.rs of code items (#9479)
We keep lib.rs for crate configs, lint configs and re-exports for the binaries.
2024-10-23 08:21:28 +02:00
Arpad Müller
3a3bd34a28 Rename IndexPart::{from_s3_bytes,to_s3_bytes} (#9481)
We support multiple storage backends now, so remove the `_s3_` from the
name.

Analogous to the names adopted for tenant manifests added in #9444.
2024-10-23 00:34:24 +02:00
Alex Chi Z.
64949a37a9 fix(pageserver): make delta split layer writer finish atomic (#9048)
similar to https://github.com/neondatabase/neon/pull/8841, we make the
delta layer writer atomic when finishing the layers.

## Summary of changes

* `put_value` not taking discard fn anymore
* `finish` decides what layers to keep

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-10-22 22:06:21 +00:00
Arpad Müller
6f8fcdf9ea Timeline offloading persistence (#9444)
Persist timeline offloaded state to S3.

Right now, as of #8907, at each restart of the pageserver, all offloaded
state is lost, so we load the full timeline again. As it starts with an
empty local directory, we might potentially download some files again,
leading to downloads that are ultimately wasteful.

This patch adds support for persisting the offloaded state, allowing us
to never load offloaded timelines in the first place. The persistence
feature is facilitated via a new file in S3 that is tenant-global, which
contains a list of all offloaded timelines. It is updated each time we
offload or unoffload a timeline, and otherwise never touched.

This choice means that tenants where no offloading is happening will not
immediately get a manifest, keeping the change very minimal at the
start.

We leave generation support for future work. It is important to support
generations, as in the worst case, the manifest might be overwritten by
an older generation after a timeline has been unoffloaded (and
unarchived), so the next pageserver process instantiation might wrongly
believe that some timeline is still offloaded even though it should be
active.

Part of #9386, #8088
2024-10-22 20:52:30 +00:00
Tristan Partin
fcb55a2aa2 Fix copy-paste error in checkpoints_timed metric
Importing the wrong metric. Sigh...

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-22 14:34:26 -06:00
a-masterov
f36cf3f885 Fix local errors for the tests with the versions mix (#9477)
## Problem
If the environment variables `COMPATIBILITY_NEON_BIN` or
`COMPATIBILITY_POSTGRES_DISTRIB_DIR` are not set (this is usual during a
local run), the tests with the versions mix cannot run.
## Summary of changes
If these variables are not set turn off the version mix.

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-10-22 21:58:55 +02:00
John Spray
8dca188974 storage controller: add metrics for tenant shard, node count (#9475)
## Problem

Previously, figuring out how many tenant shards were managed by a
storage controller was typically done by peeking at the database or
calling into the API. A metric makes it easier to monitor, as
unexpectedly increasing shard counts can be indicative of problems
elsewhere in the system.

## Summary of changes

- Add metrics `storage_controller_pageserver_nodes` (updated on node
CRUD operations from Service) and `storage_controller_tenant_shards`
(updated RAII-style from TenantShard)
2024-10-22 19:43:02 +01:00
Tristan Partin
b7fa93f6b7 Use make's builtin RM variable
At least as far as removing individual files goes, this is the best
pattern for removing. I can't say the same for removing directories, but
I went ahead and changed those to `$(RM) -r` anyway.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-22 09:14:29 -06:00
Arseny Sher
1e8e04bb2c safekeeper: refactor timeline initialization (#9362)
Always do timeline init through atomic rename of temp directory. Add
GlobalTimelines::load_temp_timeline which does this, and use it from
both pull_timeline and basic timeline creation. Fixes a collection
of issues:
- previously timeline creation didn't really flushed cfile to disk
  due to 'nothing to do if state didn't change' check;
- even if it did, without tmp dir it is possible to lose the cfile
  but leave timeline dir in place, making it look corrupted;
- tenant directory creation fsync was missing in timeline creation;
- pull_timeline is now protected from concurrent both itself and
  timeline creation;
- now global timelines map entry got special CreationInProgress
  entry type which prevents from anyone getting access to timeline
  while it is being created (previously one could get access to it,
  but it was locked during creation, which is valid but confusing if
  creation failed).

fixes #8927
2024-10-22 07:11:36 +01:00
David Gomes
94369af782 chore(compute): bumps pg_session_jwt to latest version (#9474) 2024-10-21 23:39:30 +00:00
Arpad Müller
34b6bd416a offloaded timeline list API (#9461)
Add a way to list the offloaded timelines.

Before, one had to look at logs to figure out if a timeline has been
offloaded or not, or use the non-presence of a certain timeline in the
list of normal timelines. Now, one can list them directly.
 
Part of #8088
2024-10-21 16:33:05 +01:00
Yuchen Liang
49d5e56c08 pageserver: use direct IO for delta and image layer reads (#9326)
Part of #8130 

## Problem

Pageserver previously goes through the kernel page cache for all the
IOs. The kernel page cache makes light-loaded pageserver have deceptive
fast performance. Using direct IO would offer predictable latencies of
our virtual file IO operations.

In particular for reads, the data pages also have an extremely low
temporal locality because the most frequently accessed pages are cached
on the compute side.

## Summary of changes

This PR enables pageserver to use direct IO for delta layer and image
layer reads. We can ship them separately because these layers are
write-once, read-many, so we will not be mixing buffered IO with direct
IO.

- implement `IoBufferMut`, an buffer type with aligned allocation
(currently set to 512).
- use `IoBufferMut` at all places we are doing reads on image + delta
layers.
- leverage Rust type system and use `IoBufAlignedMut` marker trait to
guarantee that the input buffers for the IO operations are aligned.
- page cache allocation is also made aligned.

_* in-memory layer reads and the write path will be shipped separately._

## Testing

Integration test suite run with O_DIRECT enabled:
https://github.com/neondatabase/neon/pull/9350

## Performance

We evaluated performance based on the `get-page-at-latest-lsn`
benchmark. The results demonstrate a decrease in the number of IOps, no
sigificant change in the latency mean, and an slight improvement on the
p99.9 and p99.99 latencies.


[Benchmark](https://www.notion.so/neondatabase/Benchmark-O_DIRECT-for-image-and-delta-layers-2024-10-01-112f189e00478092a195ea5a0137e706?pvs=4)

## Rollout

We will add `virtual_file_io_mode=direct` region by region to enable
direct IO on image + delta layers.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-21 11:01:25 -04:00
Alex Chi Z.
aca81f5fa4 fix(pageserver): make image split layer writer finish atomic (#8841)
Part of https://github.com/neondatabase/neon/issues/8836

## Summary of changes

This pull request makes the image layer split writer atomic when
finishing the layers. All the produced layers either finish at the same
time, or discard at the same time. Note that this does not prevent
atomicity when crash, but anyways, it will be cleaned up on pageserver
restart.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-10-21 15:59:48 +01:00
Ivan Efremov
2dcac94194 proxy: Use common error interface for error handling with cplane (#9454)
- Remove obsolete error handles.
- Use one source of truth for cplane errors.
#18468
2024-10-21 17:20:09 +03:00
98 changed files with 3363 additions and 1713 deletions

View File

@@ -53,20 +53,6 @@ jobs:
BUILD_TAG: ${{ inputs.build-tag }}
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16 17; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- uses: actions/checkout@v4
with:
submodules: true

View File

@@ -1078,20 +1078,6 @@ jobs:
runs-on: [ self-hosted, small ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16 17; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- uses: actions/checkout@v4
- name: Trigger deploy workflow

View File

@@ -297,7 +297,7 @@ clean: postgres-clean neon-pg-clean-ext
# This removes everything
.PHONY: distclean
distclean:
rm -rf $(POSTGRES_INSTALL_DIR)
$(RM) -r $(POSTGRES_INSTALL_DIR)
$(CARGO_CMD_PREFIX) cargo clean
.PHONY: fmt
@@ -329,7 +329,7 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list
$(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/pgindent --typedefs postgres-$*-typedefs-full.list \
$(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/ \
--excludes $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/exclude_file_patterns
rm -f pg*.BAK
$(RM) pg*.BAK
# Indent pxgn/neon.
.PHONY: neon-pgindent

View File

@@ -34,7 +34,7 @@ sql_exporter_autoscaling.yml: $(jsonnet_files)
.PHONY: clean
clean:
rm -f \
$(RM) \
etc/neon_collector.yml \
etc/neon_collector_autoscaling.yml \
etc/sql_exporter.yml \

View File

@@ -975,8 +975,8 @@ ARG PG_VERSION
RUN case "${PG_VERSION}" in "v17") \
echo "pg_session_jwt does not yet have a release that supports pg17" && exit 0;; \
esac && \
wget https://github.com/neondatabase/pg_session_jwt/archive/e642528f429dd3f5403845a50191b78d434b84a6.tar.gz -O pg_session_jwt.tar.gz && \
echo "1a69210703cc91224785e59a0a67562dd9eed9a0914ac84b11447582ca0d5b93 pg_session_jwt.tar.gz" | sha256sum --check && \
wget https://github.com/neondatabase/pg_session_jwt/archive/e1310b08ba51377a19e0559e4d1194883b9b2ba2.tar.gz -O pg_session_jwt.tar.gz && \
echo "837932a077888d5545fd54b0abcc79e5f8e37017c2769a930afc2f5c94df6f4e pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release

View File

@@ -1,7 +1,7 @@
local neon = import 'neon.libsonnet';
local pg_stat_bgwriter = importstr 'sql_exporter/checkpoints_req.sql';
local pg_stat_checkpointer = importstr 'sql_exporter/checkpoints_req.17.sql';
local pg_stat_bgwriter = importstr 'sql_exporter/checkpoints_timed.sql';
local pg_stat_checkpointer = importstr 'sql_exporter/checkpoints_timed.17.sql';
{
metric_name: 'checkpoints_timed',

View File

@@ -1,5 +1,10 @@
SELECT
slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)::FLOAT8 AS retained_wal
pg_wal_lsn_diff(
CASE
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
ELSE pg_current_wal_lsn()
END,
restart_lsn)::FLOAT8 AS retained_wal
FROM pg_replication_slots
WHERE active = false;

View File

@@ -20,7 +20,16 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::OnceLock};
use std::{
ffi::OsStr,
fs,
net::SocketAddr,
path::PathBuf,
process::ExitStatus,
str::FromStr,
sync::OnceLock,
time::{Duration, Instant},
};
use tokio::process::Command;
use tracing::instrument;
use url::Url;
@@ -168,16 +177,6 @@ impl StorageController {
.expect("non-Unicode path")
}
/// PIDFile for the postgres instance used to store storage controller state
fn postgres_pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(
self.env
.base_data_dir
.join("storage_controller_postgres.pid"),
)
.expect("non-Unicode path")
}
/// Find the directory containing postgres subdirectories, such `bin` and `lib`
///
/// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
@@ -296,6 +295,31 @@ impl StorageController {
.map_err(anyhow::Error::new)
}
/// Wrapper for the pg_ctl binary, which we spawn as a short-lived subprocess when starting and stopping postgres
async fn pg_ctl<I, S>(&self, args: I) -> ExitStatus
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
let pg_bin_dir = self.get_pg_bin_dir().await.unwrap();
let bin_path = pg_bin_dir.join("pg_ctl");
let pg_lib_dir = self.get_pg_lib_dir().await.unwrap();
let envs = [
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
];
Command::new(bin_path)
.args(args)
.envs(envs)
.spawn()
.expect("Failed to spawn pg_ctl, binary_missing?")
.wait()
.await
.expect("Failed to wait for pg_ctl termination")
}
pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> {
let instance_dir = self.storage_controller_instance_dir(start_args.instance_id);
if let Err(err) = tokio::fs::create_dir(&instance_dir).await {
@@ -404,20 +428,34 @@ impl StorageController {
db_start_args
);
background_process::start_process(
"storage_controller_db",
&self.env.base_data_dir,
pg_bin_dir.join("pg_ctl").as_std_path(),
db_start_args,
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
background_process::InitialPidFile::Create(self.postgres_pid_file()),
&start_args.start_timeout,
|| self.pg_isready(&pg_bin_dir, postgres_port),
)
.await?;
let db_start_status = self.pg_ctl(db_start_args).await;
let start_timeout: Duration = start_args.start_timeout.into();
let db_start_deadline = Instant::now() + start_timeout;
if !db_start_status.success() {
return Err(anyhow::anyhow!(
"Failed to start postgres {}",
db_start_status.code().unwrap()
));
}
loop {
if Instant::now() > db_start_deadline {
return Err(anyhow::anyhow!("Timed out waiting for postgres to start"));
}
match self.pg_isready(&pg_bin_dir, postgres_port).await {
Ok(true) => {
tracing::info!("storage controller postgres is now ready");
break;
}
Ok(false) => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => {
tracing::warn!("Failed to check postgres status: {e}")
}
}
}
self.setup_database(postgres_port).await?;
}
@@ -583,15 +621,10 @@ impl StorageController {
}
let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
println!("Stopping storage controller database...");
let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_stop_args)
.spawn()?
.wait()
.await?;
let stop_status = self.pg_ctl(pg_stop_args).await;
if !stop_status.success() {
match self.is_postgres_running().await {
Ok(false) => {
@@ -612,14 +645,9 @@ impl StorageController {
async fn is_postgres_running(&self) -> anyhow::Result<bool> {
let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_status_args)
.spawn()?
.wait()
.await?;
let status_exitcode = self.pg_ctl(pg_status_args).await;
// pg_ctl status returns this exit code if postgres is not running: in this case it is
// fine that stop failed. Otherwise it is an error that stop failed.

View File

@@ -684,6 +684,25 @@ pub struct TimelineArchivalConfigRequest {
pub state: TimelineArchivalState,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelinesInfoAndOffloaded {
pub timelines: Vec<TimelineInfo>,
pub offloaded: Vec<OffloadedTimelineInfo>,
}
/// Analog of [`TimelineInfo`] for offloaded timelines.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OffloadedTimelineInfo {
pub tenant_id: TenantShardId,
pub timeline_id: TimelineId,
/// Whether the timeline has a parent it has been branched off from or not
pub ancestor_timeline_id: Option<TimelineId>,
/// Whether to retain the branch lsn at the ancestor or not
pub ancestor_retain_lsn: Option<Lsn>,
/// The time point when the timeline was archived
pub archived_at: chrono::DateTime<chrono::Utc>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
@@ -1013,12 +1032,6 @@ pub mod virtual_file {
}
impl IoMode {
#[cfg(target_os = "linux")]
pub const fn preferred() -> Self {
Self::Direct
}
#[cfg(target_os = "macos")]
pub const fn preferred() -> Self {
Self::Buffered
}

View File

@@ -11,7 +11,7 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
match cmd {
IndexPartCmd::Dump { path } => {
let bytes = tokio::fs::read(path).await.context("read file")?;
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
let des: IndexPart = IndexPart::from_json_bytes(&bytes).context("deserialize")?;
let output = serde_json::to_string_pretty(&des).context("serialize output")?;
println!("{output}");
Ok(())

View File

@@ -345,6 +345,7 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(4, 1) => AuxFileV2::Recognized("lfc.state", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,

View File

@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_LFC_STATE: u8 = 0x04;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// Encode the aux file into a fixed-size key.
@@ -75,6 +76,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("lfc.state") {
aux_hash_to_metadata_key(AUX_DIR_LFC_STATE, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(

View File

@@ -26,6 +26,7 @@ use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::LocationConfigMode;
use pageserver_api::models::LsnLease;
use pageserver_api::models::LsnLeaseRequest;
use pageserver_api::models::OffloadedTimelineInfo;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigRequest;
@@ -37,6 +38,7 @@ use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TimelineArchivalConfigRequest;
use pageserver_api::models::TimelinesInfoAndOffloaded;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::TopTenantShardsRequest;
use pageserver_api::models::TopTenantShardsResponse;
@@ -81,6 +83,7 @@ use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::OffloadedTimeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
@@ -477,6 +480,24 @@ async fn build_timeline_info_common(
Ok(info)
}
fn build_timeline_offloaded_info(offloaded: &Arc<OffloadedTimeline>) -> OffloadedTimelineInfo {
let &OffloadedTimeline {
tenant_shard_id,
timeline_id,
ancestor_retain_lsn,
ancestor_timeline_id,
archived_at,
..
} = offloaded.as_ref();
OffloadedTimelineInfo {
tenant_id: tenant_shard_id,
timeline_id,
ancestor_retain_lsn,
ancestor_timeline_id,
archived_at: archived_at.and_utc(),
}
}
// healthcheck handler
async fn status_handler(
request: Request<Body>,
@@ -643,7 +664,7 @@ async fn timeline_list_handler(
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.context("Failed to build timeline info")
.map_err(ApiError::InternalServerError)?;
response_data.push(timeline_info);
@@ -658,6 +679,62 @@ async fn timeline_list_handler(
json_response(StatusCode::OK, response_data)
}
async fn timeline_and_offloaded_list_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let response_data = async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let (timelines, offloadeds) = tenant.list_timelines_and_offloaded();
let mut timeline_infos = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await
.context("Failed to build timeline info")
.map_err(ApiError::InternalServerError)?;
timeline_infos.push(timeline_info);
}
let offloaded_infos = offloadeds
.into_iter()
.map(|offloaded| build_timeline_offloaded_info(&offloaded))
.collect::<Vec<_>>();
let res = TimelinesInfoAndOffloaded {
timelines: timeline_infos,
offloaded: offloaded_infos,
};
Ok::<TimelinesInfoAndOffloaded, ApiError>(res)
}
.instrument(info_span!("timeline_and_offloaded_list",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()))
.await?;
json_response(StatusCode::OK, response_data)
}
async fn timeline_preserve_initdb_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -2993,6 +3070,9 @@ pub fn make_router(
.get("/v1/tenant/:tenant_shard_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
.get("/v1/tenant/:tenant_shard_id/timeline_and_offloaded", |r| {
api_handler(r, timeline_and_offloaded_list_handler)
})
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
api_handler(r, timeline_create_handler)
})

View File

@@ -2092,6 +2092,7 @@ pub(crate) struct WalIngestMetrics {
pub(crate) records_received: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
@@ -2115,6 +2116,11 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
gap_blocks_zeroed_on_rel_extend: register_int_counter!(
"pageserver_gap_blocks_zeroed_on_rel_extend",
"Total number of zero gap blocks written on relation extends"
)
.expect("failed to define a metric"),
});
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {

View File

@@ -16,6 +16,7 @@ use anyhow::{bail, Context};
use arc_swap::ArcSwap;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use chrono::NaiveDateTime;
use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
@@ -31,6 +32,10 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use remote_timeline_client::manifest::{
OffloadedTimelineManifest, TenantManifest, LATEST_TENANT_MANIFEST_VERSION,
};
use remote_timeline_client::UploadQueueNotReadyError;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
@@ -65,13 +70,14 @@ use self::config::TenantConf;
use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::upload::{upload_index_part, upload_tenant_manifest};
use self::remote_timeline_client::{RemoteTimelineClient, WaitCompletionError};
use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::GcCutoffs;
use self::timeline::TimelineDeleteProgress;
use self::timeline::TimelineResources;
use self::timeline::WaitLsnError;
use crate::config::PageServerConf;
@@ -240,6 +246,7 @@ struct TimelinePreload {
}
pub(crate) struct TenantPreload {
tenant_manifest: TenantManifest,
timelines: HashMap<TimelineId, TimelinePreload>,
}
@@ -488,6 +495,12 @@ impl WalRedoManager {
}
}
/// A very lightweight memory representation of an offloaded timeline.
///
/// We need to store the list of offloaded timelines so that we can perform operations on them,
/// like unoffloading them, or (at a later date), decide to perform flattening.
/// This type has a much smaller memory impact than [`Timeline`], and thus we can store many
/// more offloaded timelines than we can manage ones that aren't.
pub struct OffloadedTimeline {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
@@ -495,27 +508,78 @@ pub struct OffloadedTimeline {
/// Whether to retain the branch lsn at the ancestor or not
pub ancestor_retain_lsn: Option<Lsn>,
// TODO: once we persist offloaded state, make this lazily constructed
pub remote_client: Arc<RemoteTimelineClient>,
/// When the timeline was archived.
///
/// Present for future flattening deliberations.
pub archived_at: NaiveDateTime,
/// Lazily constructed remote client for the timeline
///
/// If we offload a timeline, we keep around the remote client
/// for the duration of the process. If we find it through the
/// manifest, we don't construct it up until it's needed (deletion).
pub remote_client: Option<Arc<RemoteTimelineClient>>,
/// Prevent two tasks from deleting the timeline at the same time. If held, the
/// timeline is being deleted. If 'true', the timeline has already been deleted.
pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
pub delete_progress: TimelineDeleteProgress,
}
impl OffloadedTimeline {
fn from_timeline(timeline: &Timeline) -> Self {
/// Obtains an offloaded timeline from a given timeline object.
///
/// Returns `None` if the `archived_at` flag couldn't be obtained, i.e.
/// the timeline is not in a stopped state.
/// Panics if the timeline is not archived.
fn from_timeline(timeline: &Timeline) -> Result<Self, UploadQueueNotReadyError> {
let ancestor_retain_lsn = timeline
.get_ancestor_timeline_id()
.map(|_timeline_id| timeline.get_ancestor_lsn());
Self {
let archived_at = timeline
.remote_client
.archived_at_stopped_queue()?
.expect("must be called on an archived timeline");
Ok(Self {
tenant_shard_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_retain_lsn,
archived_at,
remote_client: timeline.remote_client.clone(),
remote_client: Some(timeline.remote_client.clone()),
delete_progress: timeline.delete_progress.clone(),
})
}
fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self {
let OffloadedTimelineManifest {
timeline_id,
ancestor_timeline_id,
ancestor_retain_lsn,
archived_at,
} = *manifest;
Self {
tenant_shard_id,
timeline_id,
ancestor_timeline_id,
ancestor_retain_lsn,
archived_at,
remote_client: None,
delete_progress: TimelineDeleteProgress::default(),
}
}
fn manifest(&self) -> OffloadedTimelineManifest {
let Self {
timeline_id,
ancestor_timeline_id,
ancestor_retain_lsn,
archived_at,
..
} = self;
OffloadedTimelineManifest {
timeline_id: *timeline_id,
ancestor_timeline_id: *ancestor_timeline_id,
ancestor_retain_lsn: *ancestor_retain_lsn,
archived_at: *archived_at,
}
}
}
@@ -551,10 +615,19 @@ impl TimelineOrOffloaded {
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
}
}
pub fn remote_client(&self) -> &Arc<RemoteTimelineClient> {
pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
match self {
TimelineOrOffloaded::Timeline(timeline) => &timeline.remote_client,
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.remote_client,
TimelineOrOffloaded::Timeline(timeline) => timeline.remote_client.clone(),
TimelineOrOffloaded::Offloaded(offloaded) => match offloaded.remote_client.clone() {
Some(remote_client) => remote_client,
None => {
let remote_client = tenant.build_timeline_client(
offloaded.timeline_id,
tenant.remote_storage.clone(),
);
Arc::new(remote_client)
}
},
}
}
}
@@ -796,7 +869,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
resources: TimelineResources,
index_part: Option<IndexPart>,
index_part: IndexPart,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
_ctx: &RequestContext,
@@ -821,24 +894,7 @@ impl Tenant {
"these are used interchangeably"
);
if let Some(index_part) = index_part.as_ref() {
timeline.remote_client.init_upload_queue(index_part)?;
} else {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
// By doing what we do here, the index part upload is retried.
// If control plane retries timeline creation in the meantime, the mgmt API handler
// for timeline creation will coalesce on the upload we queue here.
// FIXME: this branch should be dead code as we no longer write local metadata.
timeline
.remote_client
.init_upload_queue_for_empty_remote(&metadata)?;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
}
timeline.remote_client.init_upload_queue(&index_part)?;
timeline
.load_layer_map(disk_consistent_lsn, index_part)
@@ -1131,14 +1187,35 @@ impl Tenant {
cancel.clone(),
)
.await?;
let (offloaded_add, tenant_manifest) =
match remote_timeline_client::do_download_tenant_manifest(
remote_storage,
&self.tenant_shard_id,
&cancel,
)
.await
{
Ok((tenant_manifest, _generation)) => (
format!("{} offloaded", tenant_manifest.offloaded_timelines.len()),
tenant_manifest,
),
Err(DownloadError::NotFound) => {
("no manifest".to_string(), TenantManifest::empty())
}
Err(e) => Err(e)?,
};
info!("found {} timelines", remote_timeline_ids.len(),);
info!(
"found {} timelines, and {offloaded_add}",
remote_timeline_ids.len()
);
for k in other_keys {
warn!("Unexpected non timeline key {k}");
}
Ok(TenantPreload {
tenant_manifest,
timelines: self
.load_timelines_metadata(remote_timeline_ids, remote_storage, cancel)
.await?,
@@ -1163,12 +1240,26 @@ impl Tenant {
anyhow::bail!("local-only deployment is no longer supported, https://github.com/neondatabase/neon/issues/5624");
};
let mut offloaded_timeline_ids = HashSet::new();
let mut offloaded_timelines_list = Vec::new();
for timeline_manifest in preload.tenant_manifest.offloaded_timelines.iter() {
let timeline_id = timeline_manifest.timeline_id;
let offloaded_timeline =
OffloadedTimeline::from_manifest(self.tenant_shard_id, timeline_manifest);
offloaded_timelines_list.push((timeline_id, Arc::new(offloaded_timeline)));
offloaded_timeline_ids.insert(timeline_id);
}
let mut timelines_to_resume_deletions = vec![];
let mut remote_index_and_client = HashMap::new();
let mut timeline_ancestors = HashMap::new();
let mut existent_timelines = HashSet::new();
for (timeline_id, preload) in preload.timelines {
if offloaded_timeline_ids.remove(&timeline_id) {
// The timeline is offloaded, skip loading it.
continue;
}
let index_part = match preload.index_part {
Ok(i) => {
debug!("remote index part exists for timeline {timeline_id}");
@@ -1272,6 +1363,43 @@ impl Tenant {
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
// Complete deletions for offloaded timeline id's.
offloaded_timelines_list
.retain(|(offloaded_id, _offloaded)| {
// At this point, offloaded_timeline_ids has the list of all offloaded timelines
// without a prefix in S3, so they are inexistent.
// In the end, existence of a timeline is finally determined by the existence of an index-part.json in remote storage.
// If there is a dangling reference in another location, they need to be cleaned up.
let delete = offloaded_timeline_ids.contains(offloaded_id);
if delete {
tracing::info!("Removing offloaded timeline {offloaded_id} from manifest as no remote prefix was found");
}
!delete
});
if !offloaded_timelines_list.is_empty() {
tracing::info!(
"Tenant has {} offloaded timelines",
offloaded_timelines_list.len()
);
}
{
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
}
if !offloaded_timeline_ids.is_empty() {
let manifest = self.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
upload_tenant_manifest(
&self.remote_storage,
&self.tenant_shard_id,
generation,
&manifest,
&self.cancel,
)
.await
.map_err(TimelineArchivalError::Other)?;
}
// The local filesystem contents are a cache of what's in the remote IndexPart;
// IndexPart is the source of truth.
@@ -1396,7 +1524,7 @@ impl Tenant {
self.timeline_init_and_sync(
timeline_id,
resources,
Some(index_part),
index_part,
remote_metadata,
ancestor,
ctx,
@@ -1443,20 +1571,28 @@ impl Tenant {
Ok(timeline_preloads)
}
fn load_timeline_metadata(
self: &Arc<Tenant>,
fn build_timeline_client(
&self,
timeline_id: TimelineId,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
) -> impl Future<Output = TimelinePreload> {
let client = RemoteTimelineClient::new(
) -> RemoteTimelineClient {
RemoteTimelineClient::new(
remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
)
}
fn load_timeline_metadata(
self: &Arc<Tenant>,
timeline_id: TimelineId,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
) -> impl Future<Output = TimelinePreload> {
let client = self.build_timeline_client(timeline_id, remote_storage);
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("starting index part download");
@@ -1547,7 +1683,7 @@ impl Tenant {
info!("unoffloading timeline");
let cancel = self.cancel.clone();
let timeline_preload = self
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel)
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
.await;
let index_part = match timeline_preload.index_part {
@@ -1592,17 +1728,37 @@ impl Tenant {
)
})
.map_err(TimelineArchivalError::Other)?;
let timelines = self.timelines.lock().unwrap();
let Some(timeline) = timelines.get(&timeline_id) else {
warn!("timeline not available directly after attach");
return Err(TimelineArchivalError::Other(anyhow::anyhow!(
"timeline not available directly after attach"
)));
let timeline = {
let timelines = self.timelines.lock().unwrap();
let Some(timeline) = timelines.get(&timeline_id) else {
warn!("timeline not available directly after attach");
// This is not a panic because no locks are held between `load_remote_timeline`
// which puts the timeline into timelines, and our look into the timeline map.
return Err(TimelineArchivalError::Other(anyhow::anyhow!(
"timeline not available directly after attach"
)));
};
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
if offloaded_timelines.remove(&timeline_id).is_none() {
warn!("timeline already removed from offloaded timelines");
}
Arc::clone(timeline)
};
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
if offloaded_timelines.remove(&timeline_id).is_none() {
warn!("timeline already removed from offloaded timelines");
}
// Upload new list of offloaded timelines to S3
let manifest = self.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
upload_tenant_manifest(
&self.remote_storage,
&self.tenant_shard_id,
generation,
&manifest,
&cancel,
)
.await
.map_err(TimelineArchivalError::Other)?;
// Activate the timeline (if it makes sense)
if !(timeline.is_broken() || timeline.is_stopping()) {
@@ -1616,7 +1772,7 @@ impl Tenant {
}
info!("timeline unoffloading complete");
Ok(Arc::clone(timeline))
Ok(timeline)
}
pub(crate) async fn apply_timeline_archival_config(
@@ -1755,7 +1911,7 @@ impl Tenant {
}
/// Lists timelines the tenant contains.
/// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
/// It's up to callers to omit certain timelines that are not considered ready for use.
pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
self.timelines
.lock()
@@ -1765,6 +1921,29 @@ impl Tenant {
.collect()
}
/// Lists timelines the tenant manages, including offloaded ones.
///
/// It's up to callers to omit certain timelines that are not considered ready for use.
pub fn list_timelines_and_offloaded(
&self,
) -> (Vec<Arc<Timeline>>, Vec<Arc<OffloadedTimeline>>) {
let timelines = self
.timelines
.lock()
.unwrap()
.values()
.map(Arc::clone)
.collect();
let offloaded = self
.timelines_offloaded
.lock()
.unwrap()
.values()
.map(Arc::clone)
.collect();
(timelines, offloaded)
}
pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
self.timelines.lock().unwrap().keys().cloned().collect()
}
@@ -2770,6 +2949,26 @@ impl Tenant {
}
}
// TODO: also copy index files of offloaded timelines
let tenant_manifest = self.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
for child_shard in child_shards {
tracing::info!(
"Uploading tenant manifest for child {}",
child_shard.to_index()
);
upload_tenant_manifest(
&self.remote_storage,
child_shard,
generation,
&tenant_manifest,
&self.cancel,
)
.await?;
}
Ok(())
}
@@ -2947,6 +3146,22 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
pub(crate) fn tenant_manifest(&self) -> TenantManifest {
let timelines_offloaded = self.timelines_offloaded.lock().unwrap();
let mut timeline_manifests = timelines_offloaded
.iter()
.map(|(_timeline_id, offloaded)| offloaded.manifest())
.collect::<Vec<_>>();
// Sort the manifests so that our output is deterministic
timeline_manifests.sort_by_key(|timeline_manifest| timeline_manifest.timeline_id);
TenantManifest {
version: LATEST_TENANT_MANIFEST_VERSION,
offloaded_timelines: timeline_manifests,
}
}
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
// Use read-copy-update in order to avoid overwriting the location config
// state if this races with [`Tenant::set_new_location_config`]. Note that
@@ -3939,18 +4154,21 @@ impl Tenant {
Ok(timeline)
}
/// Call this before constructing a timeline, to build its required structures
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
let remote_client = RemoteTimelineClient::new(
fn build_timeline_remote_client(&self, timeline_id: TimelineId) -> RemoteTimelineClient {
RemoteTimelineClient::new(
self.remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
)
}
/// Call this before constructing a timeline, to build its required structures
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
TimelineResources {
remote_client,
remote_client: self.build_timeline_remote_client(timeline_id),
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}

View File

@@ -180,6 +180,7 @@
pub(crate) mod download;
pub mod index;
pub mod manifest;
pub(crate) mod upload;
use anyhow::Context;
@@ -191,7 +192,6 @@ use pageserver_api::models::TimelineArchivalState;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
pub(crate) use upload::upload_initdb_dir;
use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
@@ -245,9 +245,11 @@ use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
use super::Generation;
pub(crate) use download::{
download_index_part, is_temp_download_file, list_remote_tenant_shards, list_remote_timelines,
do_download_tenant_manifest, download_index_part, is_temp_download_file,
list_remote_tenant_shards, list_remote_timelines,
};
pub(crate) use index::LayerFileMetadata;
pub(crate) use upload::{upload_initdb_dir, upload_tenant_manifest};
// Occasional network issues and such can cause remote operations to fail, and
// that's expected. If a download fails, we log it at info-level, and retry.
@@ -272,6 +274,12 @@ pub(crate) const BUFFER_SIZE: usize = 32 * 1024;
/// which we warn and skip.
const DELETION_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_secs(10);
/// Hardcode a generation for the tenant manifest for now so that we don't
/// need to deal with generation-less manifests in the future.
///
/// TODO: add proper generation support to all the places that use this.
pub(crate) const TENANT_MANIFEST_GENERATION: Generation = Generation::new(1);
pub enum MaybeDeletedIndexPart {
IndexPart(IndexPart),
Deleted(IndexPart),
@@ -295,6 +303,10 @@ pub enum WaitCompletionError {
UploadQueueShutDownOrStopped,
}
#[derive(Debug, thiserror::Error)]
#[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
pub struct UploadQueueNotReadyError;
/// A client for accessing a timeline's data in remote storage.
///
/// This takes care of managing the number of connections, and balancing them
@@ -468,6 +480,20 @@ impl RemoteTimelineClient {
.ok()
}
/// Returns `Ok(Some(timestamp))` if the timeline has been archived, `Ok(None)` if the timeline hasn't been archived.
///
/// Return Err(_) if the remote index_part hasn't been downloaded yet, or the timeline hasn't been stopped yet.
pub(crate) fn archived_at_stopped_queue(
&self,
) -> Result<Option<NaiveDateTime>, UploadQueueNotReadyError> {
self.upload_queue
.lock()
.unwrap()
.stopped_mut()
.map(|q| q.upload_queue_for_deletion.clean.0.archived_at)
.map_err(|_| UploadQueueNotReadyError)
}
fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
current_remote_index_part
@@ -2198,6 +2224,17 @@ pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_tenant_manifest_path(
tenant_shard_id: &TenantShardId,
generation: Generation,
) -> RemotePath {
let path = format!(
"tenants/{tenant_shard_id}/tenant-manifest{}.json",
generation.get_suffix()
);
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
RemotePath::from_string(&path).expect("Failed to construct path")

View File

@@ -34,10 +34,11 @@ use utils::id::{TenantId, TimelineId};
use utils::pausable_failpoint;
use super::index::{IndexPart, LayerFileMetadata};
use super::manifest::TenantManifest;
use super::{
parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
remote_initdb_preserved_archive_path, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
remote_initdb_preserved_archive_path, remote_tenant_manifest_path, remote_tenant_path,
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
};
///
@@ -338,19 +339,15 @@ pub async fn list_remote_timelines(
list_identifiers::<TimelineId>(storage, remote_path, cancel).await
}
async fn do_download_index_part(
async fn do_download_remote_path_retry_forever(
storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
index_generation: Generation,
remote_path: &RemotePath,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
let (index_part_bytes, index_part_mtime) = download_retry_forever(
) -> Result<(Vec<u8>, SystemTime), DownloadError> {
download_retry_forever(
|| async {
let download = storage
.download(&remote_path, &DownloadOpts::default(), cancel)
.download(remote_path, &DownloadOpts::default(), cancel)
.await?;
let mut bytes = Vec::new();
@@ -365,7 +362,39 @@ async fn do_download_index_part(
&format!("download {remote_path:?}"),
cancel,
)
.await?;
.await
}
pub async fn do_download_tenant_manifest(
storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
cancel: &CancellationToken,
) -> Result<(TenantManifest, Generation), DownloadError> {
// TODO: generation support
let generation = super::TENANT_MANIFEST_GENERATION;
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
let (manifest_bytes, _manifest_bytes_mtime) =
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
.with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
.map_err(DownloadError::Other)?;
Ok((tenant_manifest, generation))
}
async fn do_download_index_part(
storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
index_generation: Generation,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
let (index_part_bytes, index_part_mtime) =
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| format!("deserialize index part file at {remote_path:?}"))

View File

@@ -121,11 +121,11 @@ impl IndexPart {
self.disk_consistent_lsn
}
pub fn from_s3_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<IndexPart>(bytes)
}
pub fn to_s3_bytes(&self) -> serde_json::Result<Vec<u8>> {
pub fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
@@ -383,7 +383,7 @@ mod tests {
last_aux_file_policy: None,
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -427,7 +427,7 @@ mod tests {
last_aux_file_policy: None,
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -472,7 +472,7 @@ mod tests {
last_aux_file_policy: None,
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -520,7 +520,7 @@ mod tests {
last_aux_file_policy: None,
};
let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap();
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
assert_eq!(empty_layers_parsed, expected);
}
@@ -563,7 +563,7 @@ mod tests {
last_aux_file_policy: None,
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -609,7 +609,7 @@ mod tests {
last_aux_file_policy: None,
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -660,7 +660,7 @@ mod tests {
last_aux_file_policy: Some(AuxFilePolicy::V2),
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -716,7 +716,7 @@ mod tests {
last_aux_file_policy: Default::default(),
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -773,7 +773,7 @@ mod tests {
last_aux_file_policy: Default::default(),
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -835,7 +835,7 @@ mod tests {
archived_at: None,
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}

View File

@@ -0,0 +1,53 @@
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use utils::{id::TimelineId, lsn::Lsn};
/// Tenant-shard scoped manifest
#[derive(Clone, Serialize, Deserialize)]
pub struct TenantManifest {
/// Debugging aid describing the version of this manifest.
/// Can also be used for distinguishing breaking changes later on.
pub version: usize,
/// The list of offloaded timelines together with enough information
/// to not have to actually load them.
///
/// Note: the timelines mentioned in this list might be deleted, i.e.
/// we don't hold an invariant that the references aren't dangling.
/// Existence of index-part.json is the actual indicator of timeline existence.
pub offloaded_timelines: Vec<OffloadedTimelineManifest>,
}
/// The remote level representation of an offloaded timeline.
///
/// Very similar to [`pageserver_api::models::OffloadedTimelineInfo`],
/// but the two datastructures serve different needs, this is for a persistent disk format
/// that must be backwards compatible, while the other is only for informative purposes.
#[derive(Clone, Serialize, Deserialize, Copy)]
pub struct OffloadedTimelineManifest {
pub timeline_id: TimelineId,
/// Whether the timeline has a parent it has been branched off from or not
pub ancestor_timeline_id: Option<TimelineId>,
/// Whether to retain the branch lsn at the ancestor or not
pub ancestor_retain_lsn: Option<Lsn>,
/// The time point when the timeline was archived
pub archived_at: NaiveDateTime,
}
pub const LATEST_TENANT_MANIFEST_VERSION: usize = 1;
impl TenantManifest {
pub(crate) fn empty() -> Self {
Self {
version: LATEST_TENANT_MANIFEST_VERSION,
offloaded_timelines: vec![],
}
}
pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<Self>(bytes)
}
pub(crate) fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
}

View File

@@ -13,9 +13,11 @@ use tokio_util::sync::CancellationToken;
use utils::{backoff, pausable_failpoint};
use super::index::IndexPart;
use super::manifest::TenantManifest;
use super::Generation;
use crate::tenant::remote_timeline_client::{
remote_index_path, remote_initdb_archive_path, remote_initdb_preserved_archive_path,
remote_tenant_manifest_path,
};
use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
use utils::id::{TenantId, TimelineId};
@@ -39,7 +41,7 @@ pub(crate) async fn upload_index_part<'a>(
pausable_failpoint!("before-upload-index-pausable");
// FIXME: this error comes too late
let serialized = index_part.to_s3_bytes()?;
let serialized = index_part.to_json_bytes()?;
let serialized = Bytes::from(serialized);
let index_part_size = serialized.len();
@@ -55,6 +57,37 @@ pub(crate) async fn upload_index_part<'a>(
.await
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
}
/// Serializes and uploads the given tenant manifest data to the remote storage.
pub(crate) async fn upload_tenant_manifest(
storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
generation: Generation,
tenant_manifest: &TenantManifest,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
tracing::trace!("uploading new tenant manifest");
fail_point!("before-upload-manifest", |_| {
bail!("failpoint before-upload-manifest")
});
pausable_failpoint!("before-upload-manifest-pausable");
let serialized = tenant_manifest.to_json_bytes()?;
let serialized = Bytes::from(serialized);
let tenant_manifest_site = serialized.len();
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
storage
.upload_storage_object(
futures::stream::once(futures::future::ready(Ok(serialized))),
tenant_manifest_site,
&remote_path,
cancel,
)
.await
.with_context(|| format!("upload tenant manifest for '{tenant_shard_id}'"))
}
/// Attempts to upload given layer files.
/// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.

View File

@@ -1,5 +1,6 @@
//! Common traits and structs for layers
pub mod batch_split_writer;
pub mod delta_layer;
pub mod filter_iterator;
pub mod image_layer;
@@ -8,7 +9,6 @@ pub(crate) mod layer;
mod layer_desc;
mod layer_name;
pub mod merge_iterator;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;

View File

@@ -12,41 +12,154 @@ use super::{
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
};
pub(crate) enum SplitWriterResult {
pub(crate) enum BatchWriterResult {
Produced(ResidentLayer),
Discarded(PersistentLayerKey),
}
#[cfg(test)]
impl SplitWriterResult {
impl BatchWriterResult {
fn into_resident_layer(self) -> ResidentLayer {
match self {
SplitWriterResult::Produced(layer) => layer,
SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
BatchWriterResult::Produced(layer) => layer,
BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
}
}
fn into_discarded_layer(self) -> PersistentLayerKey {
match self {
SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
SplitWriterResult::Discarded(layer) => layer,
BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
BatchWriterResult::Discarded(layer) => layer,
}
}
}
enum LayerWriterWrapper {
Image(ImageLayerWriter),
Delta(DeltaLayerWriter),
}
/// An layer writer that takes unfinished layers and finish them atomically.
#[must_use]
pub struct BatchLayerWriter {
generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
conf: &'static PageServerConf,
}
impl BatchLayerWriter {
pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
Ok(Self {
generated_layer_writers: Vec::new(),
conf,
})
}
pub fn add_unfinished_image_writer(
&mut self,
writer: ImageLayerWriter,
key_range: Range<Key>,
lsn: Lsn,
) {
self.generated_layer_writers.push((
LayerWriterWrapper::Image(writer),
PersistentLayerKey {
key_range,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
is_delta: false,
},
));
}
pub fn add_unfinished_delta_writer(
&mut self,
writer: DeltaLayerWriter,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
) {
self.generated_layer_writers.push((
LayerWriterWrapper::Delta(writer),
PersistentLayerKey {
key_range,
lsn_range,
is_delta: true,
},
));
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
generated_layer_writers,
..
} = self;
let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
for produced_layer in generated_layers {
if let BatchWriterResult::Produced(resident_layer) = produced_layer {
let layer: Layer = resident_layer.into();
layer.delete_on_drop();
}
}
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(BatchWriterResult::Discarded(layer_key));
} else {
let res = match inner {
LayerWriterWrapper::Delta(writer) => {
writer.finish(layer_key.key_range.end, ctx).await
}
LayerWriterWrapper::Image(writer) => {
writer
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
}
};
let layer = match res {
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove the layer we just failed to create by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(BatchWriterResult::Produced(layer));
}
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
}
}
/// An image writer that takes images and produces multiple image layers.
///
/// The interface does not guarantee atomicity (i.e., if the image layer generation
/// fails, there might be leftover files to be cleaned up)
#[must_use]
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
lsn: Lsn,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn: Lsn,
batches: BatchLayerWriter,
start_key: Key,
}
@@ -71,27 +184,21 @@ impl SplitImageLayerWriter {
ctx,
)
.await?,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
})
}
pub async fn put_image_with_discard_fn<D, F>(
pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard: D,
) -> anyhow::Result<()>
where
D: FnOnce(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -109,72 +216,34 @@ impl SplitImageLayerWriter {
)
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
self.batches.add_unfinished_image_writer(
prev_image_writer,
self.start_key..key,
self.lsn,
);
self.start_key = key;
if discard(&layer_key).await {
drop(prev_image_writer);
self.generated_layers
.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = prev_image_writer.finish_with_end_key(key, ctx).await?;
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers
.push(SplitWriterResult::Produced(layer));
}
}
self.inner.put_image(key, img, ctx).await
}
#[cfg(test)]
pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
.await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: FnOnce(&PersistentLayerKey) -> F,
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layers,
inner,
..
mut batches, inner, ..
} = self;
if inner.num_keys() == 0 {
return Ok(generated_layers);
if inner.num_keys() != 0 {
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
}
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
if discard(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = inner.finish_with_end_key(end_key, ctx).await?;
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(SplitWriterResult::Produced(layer));
}
Ok(generated_layers)
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
#[cfg(test)]
@@ -183,22 +252,14 @@ impl SplitImageLayerWriter {
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
///
/// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
/// there might be leftover files to be cleaned up).
///
/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
@@ -206,12 +267,12 @@ impl SplitImageLayerWriter {
pub struct SplitDeltaLayerWriter {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
}
impl SplitDeltaLayerWriter {
@@ -225,29 +286,22 @@ impl SplitDeltaLayerWriter {
Ok(Self {
target_layer_size,
inner: None,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
})
}
/// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
pub async fn put_value_with_discard_fn<D, F>(
pub async fn put_value(
&mut self,
key: Key,
lsn: Lsn,
val: Value,
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard: D,
) -> anyhow::Result<()>
where
D: FnOnce(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -286,21 +340,11 @@ impl SplitDeltaLayerWriter {
.await?;
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let layer_key = PersistentLayerKey {
key_range: start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
if discard(&layer_key).await {
drop(prev_delta_writer);
self.generated_layers
.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
self.batches.add_unfinished_delta_writer(
prev_delta_writer,
start_key..key,
self.lsn_range.clone(),
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
@@ -315,53 +359,30 @@ impl SplitDeltaLayerWriter {
inner.put_value(key, lsn, val, ctx).await
}
pub async fn put_value(
&mut self,
key: Key,
lsn: Lsn,
val: Value,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
.await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
discard_fn: D,
) -> anyhow::Result<Vec<BatchWriterResult>>
where
D: FnOnce(&PersistentLayerKey) -> F,
D: Fn(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layers,
inner,
..
mut batches, inner, ..
} = self;
let Some((start_key, inner)) = inner else {
return Ok(generated_layers);
};
if inner.num_keys() == 0 {
return Ok(generated_layers);
if let Some((start_key, writer)) = inner {
if writer.num_keys() != 0 {
let end_key = self.last_key_written.next();
batches.add_unfinished_delta_writer(
writer,
start_key..end_key,
self.lsn_range.clone(),
);
}
}
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
if discard(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = inner.finish(end_key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(SplitWriterResult::Produced(delta_layer));
}
Ok(generated_layers)
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
}
#[cfg(test)]
@@ -369,15 +390,10 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<SplitWriterResult>> {
) -> anyhow::Result<Vec<BatchWriterResult>> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
.await
}
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
Ok((self.generated_layers, self.inner.map(|x| x.1)))
}
}
#[cfg(test)]
@@ -447,7 +463,7 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.put_image(get_key(0), get_img(0), &ctx)
.await
.unwrap();
let layers = image_writer
@@ -457,13 +473,7 @@ mod tests {
assert_eq!(layers.len(), 1);
delta_writer
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline,
&ctx,
)
.put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
@@ -486,14 +496,18 @@ mod tests {
#[tokio::test]
async fn write_split() {
// Test the split writer with retaining all the layers we have produced (discard=false)
write_split_helper("split_writer_write_split", false).await;
}
#[tokio::test]
async fn write_split_discard() {
write_split_helper("split_writer_write_split_discard", false).await;
// Test the split writer with discarding all the layers we have produced (discard=true)
write_split_helper("split_writer_write_split_discard", true).await;
}
/// Test the image+delta writer by writing a large number of images and deltas. If discard is
/// set to true, all layers will be discarded.
async fn write_split_helper(harness_name: &'static str, discard: bool) {
let harness = TenantHarness::create(harness_name).await.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -527,69 +541,63 @@ mod tests {
for i in 0..N {
let i = i as u32;
image_writer
.put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
discard
})
.put_image(get_key(i), get_large_img(), &ctx)
.await
.unwrap();
delta_writer
.put_value_with_discard_fn(
get_key(i),
Lsn(0x20),
Value::Image(get_large_img()),
&tline,
&ctx,
|_| async { discard },
)
.put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
.await
.unwrap();
}
let image_layers = image_writer
.finish(&tline, &ctx, get_key(N as u32))
.finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
}
for layer in delta_layers {
layer.into_discarded_layer();
}
} else {
let image_layers = image_layers
.into_iter()
.map(|x| x.into_resident_layer())
.collect_vec();
let delta_layers = delta_layers
.into_iter()
.map(|x| x.into_resident_layer())
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
image_layers[idx - 1].layer_desc().key_range.end,
image_layers[idx].layer_desc().key_range.start
);
assert_eq!(
delta_layers[idx - 1].layer_desc().key_range.end,
delta_layers[idx].layer_desc().key_range.start
);
let delta_layers = delta_writer
.finish_with_discard_fn(&tline, &ctx, |_| async { discard })
.await
.unwrap();
let image_layers = image_layers
.into_iter()
.map(|x| {
if discard {
x.into_discarded_layer()
} else {
x.into_resident_layer().layer_desc().key()
}
})
.collect_vec();
let delta_layers = delta_layers
.into_iter()
.map(|x| {
if discard {
x.into_discarded_layer()
} else {
x.into_resident_layer().layer_desc().key()
}
})
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
assert_eq!(
delta_layers.last().unwrap().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].key_range.start, Key::MIN);
assert_ne!(image_layers[idx].key_range.end, Key::MAX);
assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
image_layers[idx - 1].key_range.end,
image_layers[idx].key_range.start
);
assert_eq!(
delta_layers[idx - 1].key_range.end,
delta_layers[idx].key_range.start
);
}
}
}
@@ -629,11 +637,11 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline, &ctx)
.put_image(get_key(0), get_img(0), &ctx)
.await
.unwrap();
image_writer
.put_image(get_key(1), get_large_img(), &tline, &ctx)
.put_image(get_key(1), get_large_img(), &ctx)
.await
.unwrap();
let layers = image_writer
@@ -643,23 +651,11 @@ mod tests {
assert_eq!(layers.len(), 2);
delta_writer
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline,
&ctx,
)
.put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
.await
.unwrap();
delta_writer
.put_value(
get_key(1),
Lsn(0x1A),
Value::Image(get_large_img()),
&tline,
&ctx,
)
.put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
@@ -723,7 +719,6 @@ mod tests {
get_key(0),
Lsn(i as u64 * 16 + 0x10),
Value::Image(get_large_img()),
&tline,
&ctx,
)
.await

View File

@@ -515,8 +515,8 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
if result.is_err() {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}

View File

@@ -827,6 +827,25 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(ctx, end_key).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
///
/// Finish writing the image layer.
///
async fn finish0(
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -990,7 +1009,7 @@ impl ImageLayerWriter {
self.inner.take().unwrap().finish(ctx, None).await
}
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
/// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
pub(super) async fn finish_with_end_key(
mut self,
end_key: Key,

View File

@@ -371,7 +371,7 @@ pub struct Timeline {
/// Prevent two tasks from deleting the timeline at the same time. If held, the
/// timeline is being deleted. If 'true', the timeline has already been deleted.
pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
pub delete_progress: TimelineDeleteProgress,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
@@ -426,6 +426,8 @@ pub struct Timeline {
pub(crate) attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
}
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
pub struct WalReceiverInfo {
pub wal_source_connconf: PgConnectionConfig,
pub last_received_msg_lsn: Lsn,
@@ -2250,7 +2252,7 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
delete_progress: TimelineDeleteProgress::default(),
cancel,
gate: Gate::default(),
@@ -2402,7 +2404,7 @@ impl Timeline {
pub(super) async fn load_layer_map(
&self,
disk_consistent_lsn: Lsn,
index_part: Option<IndexPart>,
index_part: IndexPart,
) -> anyhow::Result<()> {
use init::{Decision::*, Discovered, DismissedLayer};
use LayerName::*;
@@ -2466,8 +2468,7 @@ impl Timeline {
);
}
let decided =
init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn);
let decided = init::reconcile(discovered_layers, &index_part, disk_consistent_lsn);
let mut loaded_layers = Vec::new();
let mut needs_cleanup = Vec::new();

View File

@@ -32,11 +32,11 @@ use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
};
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::split_writer::{
SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult,
};
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
};
@@ -121,18 +121,12 @@ impl KeyHistoryRetention {
async fn pipe_to(
self,
key: Key,
tline: &Arc<Timeline>,
delta_writer: &mut SplitDeltaLayerWriter,
mut image_writer: Option<&mut SplitImageLayerWriter>,
stat: &mut CompactionStatistics,
dry_run: bool,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
let discard = |key: &PersistentLayerKey| {
let key = key.clone();
async move { Self::discard_key(&key, tline, dry_run).await }
};
for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon {
if first_batch {
if logs.len() == 1 && logs[0].1.is_image() {
@@ -141,45 +135,30 @@ impl KeyHistoryRetention {
};
stat.produce_image_key(img);
if let Some(image_writer) = image_writer.as_mut() {
image_writer
.put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
.await?;
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
delta_writer
.put_value_with_discard_fn(
key,
cutoff_lsn,
Value::Image(img.clone()),
tline,
ctx,
discard,
)
.put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
.await?;
}
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer
.put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
.await?;
delta_writer.put_value(key, lsn, val, ctx).await?;
}
}
first_batch = false;
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer
.put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
.await?;
delta_writer.put_value(key, lsn, val, ctx).await?;
}
}
}
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
for (lsn, val) in above_horizon_logs {
stat.produce_key(&val);
delta_writer
.put_value_with_discard_fn(key, lsn, val, tline, ctx, discard)
.await?;
delta_writer.put_value(key, lsn, val, ctx).await?;
}
Ok(())
}
@@ -1990,11 +1969,9 @@ impl Timeline {
retention
.pipe_to(
*last_key,
self,
&mut delta_layer_writer,
image_layer_writer.as_mut(),
&mut stat,
dry_run,
ctx,
)
.await?;
@@ -2021,11 +1998,9 @@ impl Timeline {
retention
.pipe_to(
last_key,
self,
&mut delta_layer_writer,
image_layer_writer.as_mut(),
&mut stat,
dry_run,
ctx,
)
.await?;
@@ -2041,8 +2016,7 @@ impl Timeline {
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.await?
} else {
let (layers, _) = writer.take()?;
assert!(layers.is_empty(), "image layers produced in dry run mode?");
drop(writer);
Vec::new()
}
} else {
@@ -2054,8 +2028,7 @@ impl Timeline {
.finish_with_discard_fn(self, ctx, discard)
.await?
} else {
let (layers, _) = delta_layer_writer.take()?;
assert!(layers.is_empty(), "delta layers produced in dry run mode?");
drop(delta_layer_writer);
Vec::new()
};
@@ -2065,11 +2038,11 @@ impl Timeline {
let produced_image_layers_len = produced_image_layers.len();
for action in produced_delta_layers {
match action {
SplitWriterResult::Produced(layer) => {
BatchWriterResult::Produced(layer) => {
stat.produce_delta_layer(layer.layer_desc().file_size());
compact_to.push(layer);
}
SplitWriterResult::Discarded(l) => {
BatchWriterResult::Discarded(l) => {
keep_layers.insert(l);
stat.discard_delta_layer();
}
@@ -2077,11 +2050,11 @@ impl Timeline {
}
for action in produced_image_layers {
match action {
SplitWriterResult::Produced(layer) => {
BatchWriterResult::Produced(layer) => {
stat.produce_image_layer(layer.layer_desc().file_size());
compact_to.push(layer);
}
SplitWriterResult::Discarded(l) => {
BatchWriterResult::Discarded(l) => {
keep_layers.insert(l);
stat.discard_image_layer();
}

View File

@@ -14,7 +14,9 @@ use crate::{
task_mgr::{self, TaskKind},
tenant::{
metadata::TimelineMetadata,
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
remote_timeline_client::{
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
},
CreateTimelineCause, DeleteTimelineError, Tenant, TimelineOrOffloaded,
},
};
@@ -25,12 +27,9 @@ use super::{Timeline, TimelineResources};
/// during attach or pageserver restart.
/// See comment in persist_index_part_with_deleted_flag.
async fn set_deleted_in_remote_index(
timeline: &TimelineOrOffloaded,
remote_client: &Arc<RemoteTimelineClient>,
) -> Result<(), DeleteTimelineError> {
let res = timeline
.remote_client()
.persist_index_part_with_deleted_flag()
.await;
let res = remote_client.persist_index_part_with_deleted_flag().await;
match res {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
@@ -129,12 +128,10 @@ pub(super) async fn delete_local_timeline_directory(
}
/// Removes remote layers and an index file after them.
async fn delete_remote_layers_and_index(timeline: &TimelineOrOffloaded) -> anyhow::Result<()> {
timeline
.remote_client()
.delete_all()
.await
.context("delete_all")
async fn delete_remote_layers_and_index(
remote_client: &Arc<RemoteTimelineClient>,
) -> anyhow::Result<()> {
remote_client.delete_all().await.context("delete_all")
}
/// It is important that this gets called when DeletionGuard is being held.
@@ -179,6 +176,32 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
Ok(())
}
/// It is important that this gets called when DeletionGuard is being held.
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
async fn upload_new_tenant_manifest(
tenant: &Tenant,
_: &DeletionGuard, // using it as a witness
) -> anyhow::Result<()> {
// This is susceptible to race conditions, i.e. we won't continue deletions if there is a crash
// between the deletion of the index-part.json and reaching of this code.
// So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted.
// However, we handle this case in tenant loading code so the next time we attach, the issue is
// resolved.
let manifest = tenant.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
remote_timeline_client::upload_tenant_manifest(
&tenant.remote_storage,
&tenant.tenant_shard_id,
generation,
&manifest,
&tenant.cancel,
)
.await?;
Ok(())
}
/// Orchestrates timeline shut down of all timeline tasks, removes its in-memory structures,
/// and deletes its data from both disk and s3.
/// The sequence of steps:
@@ -235,7 +258,8 @@ impl DeleteTimelineFlow {
))?
});
set_deleted_in_remote_index(&timeline).await?;
let remote_client = timeline.remote_client_maybe_construct(tenant);
set_deleted_in_remote_index(&remote_client).await?;
fail::fail_point!("timeline-delete-before-schedule", |_| {
Err(anyhow::anyhow!(
@@ -243,7 +267,13 @@ impl DeleteTimelineFlow {
))?
});
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
Self::schedule_background(
guard,
tenant.conf,
Arc::clone(tenant),
timeline,
remote_client,
);
Ok(())
}
@@ -301,8 +331,9 @@ impl DeleteTimelineFlow {
guard.mark_in_progress()?;
let remote_client = timeline.remote_client.clone();
let timeline = TimelineOrOffloaded::Timeline(timeline);
Self::schedule_background(guard, tenant.conf, tenant, timeline);
Self::schedule_background(guard, tenant.conf, tenant, timeline, remote_client);
Ok(())
}
@@ -380,6 +411,7 @@ impl DeleteTimelineFlow {
conf: &'static PageServerConf,
tenant: Arc<Tenant>,
timeline: TimelineOrOffloaded,
remote_client: Arc<RemoteTimelineClient>,
) {
let tenant_shard_id = timeline.tenant_shard_id();
let timeline_id = timeline.timeline_id();
@@ -391,7 +423,7 @@ impl DeleteTimelineFlow {
Some(timeline_id),
"timeline_delete",
async move {
if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await {
if let Err(err) = Self::background(guard, conf, &tenant, &timeline, remote_client).await {
error!("Error: {err:#}");
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
timeline.set_broken(format!("{err:#}"))
@@ -408,6 +440,7 @@ impl DeleteTimelineFlow {
conf: &PageServerConf,
tenant: &Tenant,
timeline: &TimelineOrOffloaded,
remote_client: Arc<RemoteTimelineClient>,
) -> Result<(), DeleteTimelineError> {
// Offloaded timelines have no local state
// TODO: once we persist offloaded information, delete the timeline from there, too
@@ -415,12 +448,14 @@ impl DeleteTimelineFlow {
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
}
delete_remote_layers_and_index(timeline).await?;
delete_remote_layers_and_index(&remote_client).await?;
pausable_failpoint!("in_progress_delete");
remove_maybe_offloaded_timeline_from_tenant(tenant, timeline, &guard).await?;
upload_new_tenant_manifest(tenant, &guard).await?;
*guard = Self::Finished;
Ok(())

View File

@@ -125,19 +125,9 @@ pub(super) enum DismissedLayer {
/// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
pub(super) fn reconcile(
local_layers: Vec<(LayerName, LocalLayerFileMetadata)>,
index_part: Option<&IndexPart>,
index_part: &IndexPart,
disk_consistent_lsn: Lsn,
) -> Vec<(LayerName, Result<Decision, DismissedLayer>)> {
let Some(index_part) = index_part else {
// If we have no remote metadata, no local layer files are considered valid to load
return local_layers
.into_iter()
.map(|(layer_name, local_metadata)| {
(layer_name, Err(DismissedLayer::LocalOnly(local_metadata)))
})
.collect();
};
let mut result = Vec::new();
let mut remote_layers = HashMap::new();

View File

@@ -1,17 +1,17 @@
use std::sync::Arc;
use crate::tenant::{OffloadedTimeline, Tenant, TimelineOrOffloaded};
use super::{
delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard},
Timeline,
};
use super::delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard};
use super::Timeline;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::{remote_timeline_client, OffloadedTimeline, Tenant, TimelineOrOffloaded};
pub(crate) async fn offload_timeline(
tenant: &Tenant,
timeline: &Arc<Timeline>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
tracing::info!("offloading archived timeline");
let (timeline, guard) = DeleteTimelineFlow::prepare(tenant, timeline.timeline_id)?;
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
@@ -19,14 +19,28 @@ pub(crate) async fn offload_timeline(
return Ok(());
};
let is_archived = timeline.is_archived();
match is_archived {
Some(true) => (),
Some(false) => {
tracing::warn!(?is_archived, "tried offloading a non-archived timeline");
anyhow::bail!("timeline isn't archived");
}
None => {
tracing::warn!(
?is_archived,
"tried offloading a timeline where manifest is not yet available"
);
anyhow::bail!("timeline manifest hasn't been loaded yet");
}
}
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
timeline.shutdown(super::ShutdownMode::Hard).await;
// TODO extend guard mechanism above with method
// to make deletions possible while offloading is in progress
// TODO mark timeline as offloaded in S3
let conf = &tenant.conf;
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await?;
@@ -36,10 +50,31 @@ pub(crate) async fn offload_timeline(
let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
offloaded_timelines.insert(
timeline.timeline_id,
Arc::new(OffloadedTimeline::from_timeline(&timeline)),
Arc::new(
OffloadedTimeline::from_timeline(&timeline)
.expect("we checked above that timeline was ready"),
),
);
}
// Last step: mark timeline as offloaded in S3
// TODO: maybe move this step above, right above deletion of the local timeline directory,
// then there is no potential race condition where we partially offload a timeline, and
// at the next restart attach it again.
// For that to happen, we'd need to make the manifest reflect our *intended* state,
// not our actual state of offloaded timelines.
let manifest = tenant.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
remote_timeline_client::upload_tenant_manifest(
&tenant.remote_storage,
&tenant.tenant_shard_id,
generation,
&manifest,
&tenant.cancel,
)
.await?;
Ok(())
}

File diff suppressed because it is too large Load Diff

View File

@@ -32,6 +32,8 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \
@@ -54,7 +56,7 @@ walproposer-lib: libwalproposer.a;
.PHONY: libwalproposer.a
libwalproposer.a: $(WALPROP_OBJS)
rm -f $@
$(RM) $@
$(AR) $(AROPT) $@ $^
# needs vars:

View File

@@ -767,7 +767,7 @@ HandleDropRole(DropRoleStmt *stmt)
entry->type = Op_Delete;
entry->password = NULL;
if (!found)
memset(entry->old_name, 0, sizeof(entry));
memset(entry->old_name, 0, sizeof(entry->old_name));
}
}

View File

@@ -22,6 +22,7 @@
#include "neon_pgversioncompat.h"
#include "access/parallel.h"
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
@@ -30,22 +31,28 @@
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include RELFILEINFO_HDR
#include "replication/message.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogrecovery.h"
#endif
#include "hll.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -100,7 +107,9 @@ typedef struct FileCacheEntry
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 access_count : 30;
uint32 prewarm_requested : 1; /* entry should be filled by prewarm */
uint32 prewarm_started : 1; /* chunk is written by lfc_prewarm */
uint32 bitmap[CHUNK_BITMAP_SIZE];
dlist_node list_node; /* LRU/holes list node */
} FileCacheEntry;
@@ -118,26 +127,57 @@ typedef struct FileCacheControl
uint64 writes; /* number of writes issued */
uint64 time_read; /* time spent reading (us) */
uint64 time_write; /* time spent writing (us) */
uint32 prewarm_total_chunks;
uint32 prewarm_curr_chunk;
uint32 prewarmed_pages;
uint32 skipped_pages;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
typedef struct FileCacheStateEntry
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheStateEntry;
static HTAB *lfc_hash;
static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static char *lfc_path;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static CustomCheckpointHookType PrevCheckpointHook;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
PGDLLEXPORT void LfcPrewarmMain(Datum main_arg);
static void
LfcCheckpointHook(int flags)
{
if (flags & CHECKPOINT_IS_SHUTDOWN)
{
lfc_save_state();
}
if (PrevCheckpointHook)
{
PrevCheckpointHook(flags);
}
}
/*
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
@@ -149,7 +189,7 @@ lfc_disable(char const *op)
{
int fd;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -184,7 +224,7 @@ lfc_disable(char const *op)
pgstat_report_wait_end();
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path);
}
}
@@ -196,7 +236,7 @@ lfc_disable(char const *op)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);
@@ -236,6 +276,17 @@ lfc_ensure_opened(void)
return enabled;
}
PGDLLEXPORT void
LfcPrewarmMain(Datum main_arg)
{
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
lfc_load_pages();
}
static void
lfc_shmem_startup(void)
{
@@ -267,14 +318,7 @@ lfc_shmem_startup(void)
n_chunks + 1, n_chunks + 1,
&info,
HASH_ELEM | HASH_BLOBS);
lfc_ctl->generation = 0;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
lfc_ctl->writes = 0;
lfc_ctl->time_read = 0;
lfc_ctl->time_write = 0;
memset(lfc_ctl, 0, sizeof *lfc_ctl);
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -285,7 +329,7 @@ lfc_shmem_startup(void)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
@@ -295,6 +339,9 @@ lfc_shmem_startup(void)
}
}
LWLockRelease(AddinShmemInitLock);
PrevCheckpointHook = CustomCheckpointHook;
CustomCheckpointHook = LfcCheckpointHook;
}
static void
@@ -327,7 +374,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
if (*newval > lfc_max_size)
{
elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
return false;
}
return true;
@@ -436,6 +483,32 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed pages",
NULL,
&lfc_prewarm_limit,
0, /* disabled by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -447,8 +520,326 @@ lfc_init(void)
#else
lfc_shmem_request();
#endif
if (lfc_prewarm_limit != 0)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LfcPrewarmMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
RegisterBackgroundWorker(&bgw);
}
}
static FileCacheStateEntry*
lfc_get_state(size_t* n_entries)
{
size_t max_entries = *n_entries;
size_t i = 0;
FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
memcpy(fs[i].bitmap, entry->bitmap, sizeof entry->bitmap);
if (++i == max_entries)
break;
}
elog(LOG, "LFC: save state of %ld chunks", (long)i);
}
LWLockRelease(lfc_lock);
*n_entries = i;
return fs;
}
/*
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
* This function saves first mostrecently used pages.
* It is expected to be called at shutdown checkpoint by checkpointer.
*/
void
lfc_save_state(void)
{
size_t n_entries = lfc_prewarm_limit;
FileCacheStateEntry* fs;
if (n_entries == 0)
return;
fs = lfc_get_state(&n_entries);
if (n_entries != 0)
{
#if PG_MAJORVERSION_NUM < 17
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false));
#else
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false, true);
#endif
}
pfree(fs);
}
/*
* Prewarm LFC cache to the specified state.
*
* Prewarming can interfere with accesses to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres
* is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting for read completion.
*
* But it is not true for prewarming: backend can fetch page itself, modify and then write it to LFC. At the
* same time `lfc_prewarm` tries to write deteriorated image of this page in LFC. To increase concurrency, access to LFC files (both read and write)
* is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file.
* Certainly we can not rely on disk content in this case.
*
* To solve this problem we use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set before prewarm is actually started.
* `lfc_prewarm` writes to LFC file only if this flag is set. This flag is cleared if any other backend performs write to this LFC chunk.
* In this case data loaded by `lfc_prewarm` is considered to be deteriorated and should be just ignored.
*
* But as far as write to LFC is performed without holding lock, there is no guarantee that no such write is in progress.
* This is why second flag is used: `prewarm_started`. It is set by `lfc_prewarm` when is starts writing page and cleared when write is completed.
* Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) if this flag is set.
* So neither `lfc_prewarm`, neither backend are saving page in LFC in this case - it is just skipped.
*/
static void
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
{
ssize_t rc;
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
FileCacheEntry *entry;
uint64 generation;
uint32 entry_offset;
uint32 hash;
size_t i;
bool found;
int shard_no;
if (!lfc_ensure_opened())
return;
if (n_entries == 0 || fs == NULL)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
LWLockRelease(lfc_lock);
return;
}
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
{
n_entries = lfc_ctl->limit - lfc_ctl->size;
}
/* Initialize fields used to track prewarming progress */
lfc_ctl->prewarm_total_chunks = n_entries;
lfc_ctl->prewarm_curr_chunk = 0;
/*
* Load LFC state and add entries in hash table.
* It is needed to track modification of prewarmed pages.
* All such entries have `prewarm_requested` flag set. When entry is updated (some backed reads or writes
* some pages from this chunk), then `prewarm_requested` flag is cleared, prohibiting prewarm of this chunk.
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
*/
for (i = 0; i < n_entries; i++)
{
hash = get_hash_value(lfc_hash, &fs[i].key);
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found);
/* Do not prewarm chunks which are already present in LFC */
if (!found)
{
entry->offset = lfc_ctl->size++;
entry->hash = hash;
entry->access_count = 0;
entry->prewarm_requested = true;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
/* Most recently visted pages are stored first */
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
lfc_ctl->used += 1;
}
}
LWLockRelease(lfc_lock);
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
while (true)
{
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
if (chunk_no < n_entries)
{
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
{
/*
* In case of prewarming replica we should be careful not to load too new version
* of the page - with LSN larger than current replay LSN.
* At primary we are always loading latest version.
*/
XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
.forknum = fs[chunk_no].key.forkNum,
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
.req.lsn = req_lsn,
.req.not_modified_since = 0
};
shard_no = get_shard_number(&fs[chunk_no].key);
while (!page_server->send(shard_no, (NeonRequest *) &request)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
n_sent += 1;
}
snd_idx += 1;
}
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
{
NeonResponse * resp;
do
{
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
rcv_idx += 1;
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
shard_no = get_shard_number(&fs[chunk_no].key);
resp = page_server->receive(shard_no);
lfc_ctl->prewarm_curr_chunk = chunk_no;
if (resp->tag != T_NeonGetPageResponse)
{
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
return;
}
hash = get_hash_value(lfc_hash, &fs[chunk_no].key);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &fs[chunk_no].key, hash, HASH_FIND, NULL);
if (entry != NULL && entry->prewarm_requested)
{
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
Assert(!entry->prewarm_started);
entry->prewarm_started = true;
LWLockRelease(lfc_lock);
rc = pwrite(lfc_desc, ((NeonGetPageResponse*)resp)->page, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + offs_in_chunk) * BLCKSZ);
if (rc != BLCKSZ)
{
lfc_disable("write");
break;
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
if (entry->prewarm_requested)
{
lfc_ctl->used_pages += 1 - ((entry->bitmap[offs_in_chunk >> 5] >> (offs_in_chunk & 31)) & 1);
entry->bitmap[offs_in_chunk >> 5] |= 1 << (offs_in_chunk & 31);
lfc_ctl->prewarmed_pages += 1;
}
else
{
lfc_ctl->skipped_pages += 1;
}
Assert(entry->prewarm_started);
entry->prewarm_started = false;
}
LWLockRelease(lfc_lock);
}
}
else
{
Assert(!entry || !entry->prewarm_started);
lfc_ctl->skipped_pages += 1;
LWLockRelease(lfc_lock);
}
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
{
break;
}
}
}
Assert(n_sent == n_received);
lfc_ctl->prewarm_curr_chunk = n_entries;
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
}
/*
* Load pages from LFC state saved in AUX file.
*/
void
lfc_load_pages(void)
{
int fd;
FileCacheStateEntry *fs;
ssize_t rc;
size_t max_entries = lfc_prewarm_limit;
fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
if (fd < 0)
{
elog(LOG, "LFC: state file is missing");
return;
}
fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
if (rc <= 0)
{
elog(LOG, "LFC: Failed to read state file: %m");
CloseTransientFile(fd);
}
else
{
CloseTransientFile(fd);
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
lfc_prewarm(fs, rc / sizeof(FileCacheStateEntry));
}
pfree(fs);
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -616,6 +1007,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
entry->prewarm_requested = false; /* prohibit prewarm of this LFC entry */
if (entry->access_count == 0)
{
@@ -861,7 +1253,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/*
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
@@ -899,6 +1291,17 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (found)
{
if (entry->prewarm_started)
{
/*
* Some page of this chunk is currently written by `lfc_prewarm`.
* We should give-up not to interfere with it.
* But clearing `prewarm_requested` flag also will not allow `lfc_prewarm` to fix it result.
*/
entry->prewarm_requested = false;
LWLockRelease(lfc_lock);
return;
}
/*
* Unlink entry from LRU list to pin it for the duration of IO
* operation
@@ -928,7 +1331,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
@@ -944,10 +1347,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found);
CriticalAssert(hole_found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
@@ -959,9 +1362,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
entry->access_count = 1;
entry->hash = hash;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
entry->prewarm_requested = false; /* prohibit prewarm if LFC entry is updated by some backend */
generation = lfc_ctl->generation;
entry_offset = entry->offset;
LWLockRelease(lfc_lock);
@@ -1334,3 +1739,74 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(save_local_cache_state);
Datum
save_local_cache_state(PG_FUNCTION_ARGS)
{
lfc_save_state();
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
memcpy(VARDATA(res), fs, size_in_bytes);
pfree(fs);
PG_RETURN_BYTEA_P(res);
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_entries = VARSIZE_ANY_EXHDR(state);
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
lfc_prewarm(fs, n_entries);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
LWLockAcquire(lfc_lock, LW_SHARED);
values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks);
values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk);
values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages);
values[3] = Int32GetDatum(lfc_ctl->skipped_pages);
LWLockRelease(lfc_lock);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -0,0 +1,28 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION save_local_cache_state()
RETURNS void
AS 'MODULE_PATHNAME', 'save_local_cache_state'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS save_local_cache_state();
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);

View File

@@ -276,6 +276,8 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern void lfc_save_state(void);
extern void lfc_load_pages(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -16,7 +16,7 @@ use crate::context::RequestMonitoring;
use crate::control_plane::errors::GetEndpointJwksError;
use crate::http::parse_json_body_with_limit;
use crate::intern::RoleNameInt;
use crate::{EndpointId, RoleName};
use crate::types::{EndpointId, RoleName};
// TODO(conrad): make these configurable.
const CLOCK_SKEW_LEEWAY: Duration = Duration::from_secs(30);
@@ -669,7 +669,7 @@ mod tests {
use tokio::net::TcpListener;
use super::*;
use crate::RoleName;
use crate::types::RoleName;
fn new_ec_jwk(kid: String) -> (p256::SecretKey, jose_jwk::Jwk) {
let sk = p256::SecretKey::random(&mut OsRng);

View File

@@ -10,9 +10,10 @@ use crate::compute_ctl::ComputeCtlApi;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo};
use crate::control_plane::NodeInfo;
use crate::http;
use crate::intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag};
use crate::types::EndpointId;
use crate::url::ApiUrl;
use crate::{http, EndpointId};
pub struct LocalBackend {
pub(crate) initialize: Semaphore,

View File

@@ -32,7 +32,8 @@ use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::proxy::NeonOptions;
use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter, RateBucketInfo};
use crate::stream::Stream;
use crate::{scram, stream, EndpointCacheKey, EndpointId, RoleName};
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{scram, stream};
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
pub enum MaybeOwned<'a, T> {
@@ -551,7 +552,7 @@ mod tests {
async fn get_endpoint_jwks(
&self,
_ctx: &RequestMonitoring,
_endpoint: crate::EndpointId,
_endpoint: crate::types::EndpointId,
) -> Result<Vec<super::jwt::AuthRule>, control_plane::errors::GetEndpointJwksError>
{
unimplemented!()

View File

@@ -15,7 +15,7 @@ use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, SniKind};
use crate::proxy::NeonOptions;
use crate::serverless::SERVERLESS_DRIVER_SNI;
use crate::{EndpointId, RoleName};
use crate::types::{EndpointId, RoleName};
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub(crate) enum ComputeUserInfoParseError {

View File

@@ -5,7 +5,7 @@
use bstr::ByteSlice;
use crate::EndpointId;
use crate::types::EndpointId;
pub(crate) struct PasswordHackPayload {
pub(crate) endpoint: EndpointId,

View File

@@ -25,8 +25,8 @@ use proxy::rate_limiter::{
use proxy::scram::threadpool::ThreadPool;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::{self, GlobalConnPoolOptions};
use proxy::types::RoleName;
use proxy::url::ApiUrl;
use proxy::RoleName;
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
@@ -177,7 +177,7 @@ async fn main() -> anyhow::Result<()> {
let mut maintenance_tasks = JoinSet::new();
let refresh_config_notify = Arc::new(Notify::new());
maintenance_tasks.spawn(proxy::handle_signals(shutdown.clone(), {
maintenance_tasks.spawn(proxy::signals::handle(shutdown.clone(), {
let refresh_config_notify = Arc::clone(&refresh_config_notify);
move || {
refresh_config_notify.notify_one();
@@ -216,7 +216,7 @@ async fn main() -> anyhow::Result<()> {
match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await {
// exit immediately on maintenance task completion
Either::Left((Some(res), _)) => match proxy::flatten_err(res)? {},
Either::Left((Some(res), _)) => match proxy::error::flatten_err(res)? {},
// exit with error immediately if all maintenance tasks have ceased (should be caught by branch above)
Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"),
// exit immediately on client task error

View File

@@ -133,14 +133,14 @@ async fn main() -> anyhow::Result<()> {
proxy_listener,
cancellation_token.clone(),
));
let signals_task = tokio::spawn(proxy::handle_signals(cancellation_token, || {}));
let signals_task = tokio::spawn(proxy::signals::handle(cancellation_token, || {}));
// the signal task cant ever succeed.
// the main task can error, or can succeed on cancellation.
// we want to immediately exit on either of these cases
let signal = match futures::future::select(signals_task, main).await {
Either::Left((res, _)) => proxy::flatten_err(res)?,
Either::Right((res, _)) => return proxy::flatten_err(res),
Either::Left((res, _)) => proxy::error::flatten_err(res)?,
Either::Right((res, _)) => return proxy::error::flatten_err(res),
};
// maintenance tasks return `Infallible` success values, this is an impossible value

View File

@@ -495,7 +495,7 @@ async fn main() -> anyhow::Result<()> {
// maintenance tasks. these never return unless there's an error
let mut maintenance_tasks = JoinSet::new();
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone(), || {}));
maintenance_tasks.spawn(proxy::signals::handle(cancellation_token.clone(), || {}));
maintenance_tasks.spawn(http::health_server::task_main(
http_listener,
AppMetrics {
@@ -561,11 +561,11 @@ async fn main() -> anyhow::Result<()> {
.await
{
// exit immediately on maintenance task completion
Either::Left((Some(res), _)) => break proxy::flatten_err(res)?,
Either::Left((Some(res), _)) => break proxy::error::flatten_err(res)?,
// exit with error immediately if all maintenance tasks have ceased (should be caught by branch above)
Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"),
// exit immediately on client task error
Either::Right((Some(res), _)) => proxy::flatten_err(res)?,
Either::Right((Some(res), _)) => proxy::error::flatten_err(res)?,
// exit if all our client tasks have shutdown gracefully
Either::Right((None, _)) => return Ok(()),
}

View File

@@ -17,7 +17,7 @@ use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
use crate::rate_limiter::GlobalRateLimiter;
use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::EndpointId;
use crate::types::EndpointId;
#[derive(Deserialize, Debug, Clone)]
pub(crate) struct ControlPlaneEventKey {

View File

@@ -17,7 +17,7 @@ use crate::auth::IpPattern;
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::AuthSecret;
use crate::intern::{EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::{EndpointId, RoleName};
use crate::types::{EndpointId, RoleName};
#[async_trait]
pub(crate) trait ProjectInfoCache {
@@ -368,7 +368,7 @@ impl Cache for ProjectInfoCacheImpl {
mod tests {
use super::*;
use crate::scram::ServerSecret;
use crate::ProjectId;
use crate::types::ProjectId;
#[tokio::test]
async fn test_project_info_cache_settings() {

View File

@@ -25,7 +25,7 @@ use crate::control_plane::provider::ApiLockError;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumDbConnectionsGuard};
use crate::proxy::neon_option;
use crate::Host;
use crate::types::Host;
pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";

View File

@@ -4,8 +4,9 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::http;
use crate::types::{DbName, RoleName};
use crate::url::ApiUrl;
use crate::{http, DbName, RoleName};
pub struct ComputeCtlApi {
pub(crate) api: http::Endpoint,

View File

@@ -20,7 +20,7 @@ use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}
use crate::scram::threadpool::ThreadPool;
use crate::serverless::cancel_set::CancelSet;
use crate::serverless::GlobalConnPoolOptions;
use crate::Host;
use crate::types::Host;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,

View File

@@ -19,7 +19,7 @@ use crate::intern::{BranchIdInt, ProjectIdInt};
use crate::metrics::{
ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting,
};
use crate::{DbName, EndpointId, RoleName};
use crate::types::{DbName, EndpointId, RoleName};
pub mod parquet;

View File

@@ -161,6 +161,9 @@ pub(crate) enum Reason {
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
#[serde(rename = "LOCK_ALREADY_TAKEN")]
LockAlreadyTaken,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
ActiveEndpointsLimitExceeded,
#[default]
#[serde(other)]
Unknown,
@@ -194,7 +197,8 @@ impl Reason {
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded => false,
| Reason::LogicalSizeQuotaExceeded
| Reason::ActiveEndpointsLimitExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations

View File

@@ -21,8 +21,9 @@ use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::provider::{CachedAllowedIps, CachedRoleSecret};
use crate::error::io_error;
use crate::intern::RoleNameInt;
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
use crate::url::ApiUrl;
use crate::{compute, scram, BranchId, EndpointId, ProjectId, RoleName};
use crate::{compute, scram};
#[derive(Debug, Error)]
enum MockApiError {

View File

@@ -23,7 +23,8 @@ use crate::error::ReportableError;
use crate::intern::ProjectIdInt;
use crate::metrics::ApiLockMetrics;
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
use crate::{compute, scram, EndpointCacheKey, EndpointId};
use crate::types::{EndpointCacheKey, EndpointId};
use crate::{compute, scram};
pub(crate) mod errors {
use thiserror::Error;
@@ -87,36 +88,8 @@ pub(crate) mod errors {
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
Reason::RunningOperations => ErrorKind::ControlPlane,
Reason::Unknown => match &**e {
ControlPlaneError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ControlPlaneError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
error,
..
} if error
.contains("compute time quota of non-primary branches is exceeded") =>
{
crate::error::ErrorKind::Quota
}
ControlPlaneError {
http_status_code: http::StatusCode::LOCKED,
error,
..
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::Quota
}
ControlPlaneError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ControlPlaneError { .. } => crate::error::ErrorKind::ControlPlane,
},
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane,
Reason::Unknown => ErrorKind::ControlPlane,
},
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}

View File

@@ -24,7 +24,8 @@ use crate::control_plane::errors::GetEndpointJwksError;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
use crate::metrics::{CacheOutcome, Metrics};
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::{compute, http, scram, EndpointCacheKey, EndpointId};
use crate::types::{EndpointCacheKey, EndpointId};
use crate::{compute, http, scram};
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");

View File

@@ -1,7 +1,9 @@
use std::error::Error as StdError;
use std::{fmt, io};
use anyhow::Context;
use measured::FixedCardinalityLabel;
use tokio::task::JoinError;
/// Upcast (almost) any error into an opaque [`io::Error`].
pub(crate) fn io_error(e: impl Into<Box<dyn StdError + Send + Sync>>) -> io::Error {
@@ -97,3 +99,8 @@ impl ReportableError for tokio_postgres::error::Error {
}
}
}
/// Flattens `Result<Result<T>>` into `Result<T>`.
pub fn flatten_err<T>(r: Result<anyhow::Result<T>, JoinError>) -> anyhow::Result<T> {
r.context("join error").and_then(|x| x)
}

View File

@@ -7,7 +7,7 @@ use std::sync::OnceLock;
use lasso::{Capacity, MemoryLimits, Spur, ThreadedRodeo};
use rustc_hash::FxHasher;
use crate::{BranchId, EndpointId, ProjectId, RoleName};
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
pub trait InternId: Sized + 'static {
fn get_interner() -> &'static StringInterner<Self>;

View File

@@ -78,14 +78,6 @@
// List of temporarily allowed lints to unblock beta/nightly.
#![allow(unknown_lints)]
use std::convert::Infallible;
use anyhow::{bail, Context};
use intern::{EndpointIdInt, EndpointIdTag, InternId};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::warn;
pub mod auth;
pub mod cache;
pub mod cancellation;
@@ -109,165 +101,9 @@ pub mod redis;
pub mod sasl;
pub mod scram;
pub mod serverless;
pub mod signals;
pub mod stream;
pub mod types;
pub mod url;
pub mod usage_metrics;
pub mod waiters;
/// Handle unix signals appropriately.
pub async fn handle_signals<F>(
token: CancellationToken,
mut refresh_config: F,
) -> anyhow::Result<Infallible>
where
F: FnMut(),
{
use tokio::signal::unix::{signal, SignalKind};
let mut hangup = signal(SignalKind::hangup())?;
let mut interrupt = signal(SignalKind::interrupt())?;
let mut terminate = signal(SignalKind::terminate())?;
loop {
tokio::select! {
// Hangup is commonly used for config reload.
_ = hangup.recv() => {
warn!("received SIGHUP");
refresh_config();
}
// Shut down the whole application.
_ = interrupt.recv() => {
warn!("received SIGINT, exiting immediately");
bail!("interrupted");
}
_ = terminate.recv() => {
warn!("received SIGTERM, shutting down once all existing connections have closed");
token.cancel();
}
}
}
}
/// Flattens `Result<Result<T>>` into `Result<T>`.
pub fn flatten_err<T>(r: Result<anyhow::Result<T>, JoinError>) -> anyhow::Result<T> {
r.context("join error").and_then(|x| x)
}
macro_rules! smol_str_wrapper {
($name:ident) => {
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct $name(smol_str::SmolStr);
impl $name {
#[allow(unused)]
pub(crate) fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<T> std::cmp::PartialEq<T> for $name
where
smol_str::SmolStr: std::cmp::PartialEq<T>,
{
fn eq(&self, other: &T) -> bool {
self.0.eq(other)
}
}
impl<T> From<T> for $name
where
smol_str::SmolStr: From<T>,
{
fn from(x: T) -> Self {
Self(x.into())
}
}
impl AsRef<str> for $name {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}
impl std::ops::Deref for $name {
type Target = str;
fn deref(&self) -> &str {
&*self.0
}
}
impl<'de> serde::de::Deserialize<'de> for $name {
fn deserialize<D: serde::de::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
<smol_str::SmolStr as serde::de::Deserialize<'de>>::deserialize(d).map(Self)
}
}
impl serde::Serialize for $name {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
self.0.serialize(s)
}
}
};
}
const POOLER_SUFFIX: &str = "-pooler";
impl EndpointId {
fn normalize(&self) -> Self {
if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) {
stripped.into()
} else {
self.clone()
}
}
fn normalize_intern(&self) -> EndpointIdInt {
if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) {
EndpointIdTag::get_interner().get_or_intern(stripped)
} else {
self.into()
}
}
}
// 90% of role name strings are 20 characters or less.
smol_str_wrapper!(RoleName);
// 50% of endpoint strings are 23 characters or less.
smol_str_wrapper!(EndpointId);
// 50% of branch strings are 23 characters or less.
smol_str_wrapper!(BranchId);
// 90% of project strings are 23 characters or less.
smol_str_wrapper!(ProjectId);
// will usually equal endpoint ID
smol_str_wrapper!(EndpointCacheKey);
smol_str_wrapper!(DbName);
// postgres hostname, will likely be a port:ip addr
smol_str_wrapper!(Host);
// Endpoints are a bit tricky. Rare they might be branches or projects.
impl EndpointId {
pub(crate) fn is_endpoint(&self) -> bool {
self.0.starts_with("ep-")
}
pub(crate) fn is_branch(&self) -> bool {
self.0.starts_with("br-")
}
// pub(crate) fn is_project(&self) -> bool {
// !self.is_endpoint() && !self.is_branch()
// }
pub(crate) fn as_branch(&self) -> BranchId {
BranchId(self.0.clone())
}
pub(crate) fn as_project(&self) -> ProjectId {
ProjectId(self.0.clone())
}
}

View File

@@ -14,6 +14,7 @@ use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
use tokio::time::{self, Instant};
use crate::control_plane::messages::ColdStartInfo;
use crate::error::ErrorKind;
#[derive(MetricGroup)]
#[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
@@ -325,23 +326,10 @@ pub enum ConnectionFailureKind {
ComputeUncached,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "kind")]
pub enum WakeupFailureKind {
BadComputeAddress,
ApiTransportError,
QuotaExceeded,
ApiConsoleLocked,
ApiConsoleBadRequest,
ApiConsoleOtherServerError,
ApiConsoleOtherError,
TimeoutError,
}
#[derive(LabelGroup)]
#[label(set = ConnectionFailuresBreakdownSet)]
pub struct ConnectionFailuresBreakdownGroup {
pub kind: WakeupFailureKind,
pub kind: ErrorKind,
pub retry: Bool,
}

View File

@@ -17,7 +17,7 @@ use crate::metrics::{
};
use crate::proxy::retry::{retry_after, should_retry, CouldRetry};
use crate::proxy::wake_compute::wake_compute;
use crate::Host;
use crate::types::Host;
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);

View File

@@ -32,7 +32,8 @@ use crate::protocol2::read_proxy_protocol;
use crate::proxy::handshake::{handshake, HandshakeData};
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::{PqStream, Stream};
use crate::{auth, compute, EndpointCacheKey};
use crate::types::EndpointCacheKey;
use crate::{auth, compute};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";

View File

@@ -28,7 +28,8 @@ use crate::control_plane::provider::{
};
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::ErrorKind;
use crate::{sasl, scram, BranchId, EndpointId, ProjectId};
use crate::types::{BranchId, EndpointId, ProjectId};
use crate::{sasl, scram};
/// Generate a set of TLS certificates: CA + server.
fn generate_certs(

View File

@@ -1,15 +1,13 @@
use hyper::StatusCode;
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;
use crate::config::RetryConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::messages::{ControlPlaneError, Reason};
use crate::control_plane::provider::CachedNodeInfo;
use crate::error::ReportableError;
use crate::metrics::{
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
WakeupFailureKind,
};
use crate::proxy::retry::{retry_after, should_retry};
@@ -60,62 +58,8 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::control_plane::errors::ApiError;
let kind = match e {
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
WakeComputeError::ApiError(ApiError::ControlPlane(e)) => match e.get_reason() {
Reason::RoleProtected => WakeupFailureKind::ApiConsoleBadRequest,
Reason::ResourceNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::ProjectNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::EndpointNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::BranchNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::RateLimitExceeded => WakeupFailureKind::ApiConsoleLocked,
Reason::NonDefaultBranchComputeTimeExceeded => WakeupFailureKind::QuotaExceeded,
Reason::ActiveTimeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::ComputeTimeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::WrittenDataQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::DataTransferQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::LogicalSizeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::ConcurrencyLimitReached => WakeupFailureKind::ApiConsoleLocked,
Reason::LockAlreadyTaken => WakeupFailureKind::ApiConsoleLocked,
Reason::RunningOperations => WakeupFailureKind::ApiConsoleLocked,
Reason::Unknown => match **e {
ControlPlaneError {
http_status_code: StatusCode::LOCKED,
ref error,
..
} if error.contains("written data quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
ControlPlaneError {
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} if error.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
ControlPlaneError {
http_status_code: StatusCode::LOCKED,
..
} => WakeupFailureKind::ApiConsoleLocked,
ControlPlaneError {
http_status_code: StatusCode::BAD_REQUEST,
..
} => WakeupFailureKind::ApiConsoleBadRequest,
ControlPlaneError {
http_status_code, ..
} if http_status_code.is_server_error() => {
WakeupFailureKind::ApiConsoleOtherServerError
}
ControlPlaneError { .. } => WakeupFailureKind::ApiConsoleOtherError,
},
},
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
};
let kind = e.get_error_kind();
Metrics::get()
.proxy
.connection_failures_breakdown

View File

@@ -250,7 +250,7 @@ mod tests {
use super::{BucketRateLimiter, WakeComputeRateLimiter};
use crate::intern::EndpointIdInt;
use crate::rate_limiter::RateBucketInfo;
use crate::EndpointId;
use crate::types::EndpointId;
#[test]
fn rate_bucket_rpi() {

View File

@@ -271,7 +271,7 @@ mod tests {
use serde_json::json;
use super::*;
use crate::{ProjectId, RoleName};
use crate::types::{ProjectId, RoleName};
#[test]
fn parse_allowed_ips() -> anyhow::Result<()> {

View File

@@ -62,7 +62,7 @@ mod tests {
use super::{Exchange, ServerSecret};
use crate::intern::EndpointIdInt;
use crate::sasl::{Mechanism, Step};
use crate::EndpointId;
use crate::types::EndpointId;
#[test]
fn snapshot() {

View File

@@ -189,7 +189,7 @@ impl Drop for JobHandle {
#[cfg(test)]
mod tests {
use super::*;
use crate::EndpointId;
use crate::types::EndpointId;
#[tokio::test]
async fn hash_is_correct() {

View File

@@ -18,6 +18,7 @@ use super::local_conn_pool::{self, LocalClient, LocalConnPool, EXT_NAME, EXT_SCH
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
use crate::compute;
use crate::compute_ctl::{
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
};
@@ -32,7 +33,7 @@ use crate::intern::EndpointIdInt;
use crate::proxy::connect_compute::ConnectMechanism;
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
use crate::rate_limiter::EndpointRateLimiter;
use crate::{compute, EndpointId, Host};
use crate::types::{EndpointId, Host};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,

View File

@@ -211,7 +211,7 @@ mod tests {
use super::*;
use crate::proxy::NeonOptions;
use crate::serverless::cancel_set::CancelSet;
use crate::{BranchId, EndpointId, ProjectId};
use crate::types::{BranchId, EndpointId, ProjectId};
struct MockClient(Arc<AtomicBool>);
impl MockClient {

View File

@@ -16,8 +16,8 @@ use crate::auth::backend::ComputeUserInfo;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::ColdStartInfo;
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::types::{DbName, EndpointCacheKey, RoleName};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{DbName, EndpointCacheKey, RoleName};
#[derive(Debug, Clone)]
pub(crate) struct ConnInfo {

View File

@@ -14,8 +14,8 @@ use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::types::EndpointCacheKey;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::EndpointCacheKey;
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
pub(crate) type Connect =

View File

@@ -35,8 +35,8 @@ use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
use crate::types::{DbName, RoleName};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{DbName, RoleName};
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.1.2";

View File

@@ -38,8 +38,8 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{run_until_cancelled, NeonOptions};
use crate::serverless::backend::HttpConnError;
use crate::types::{DbName, RoleName};
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
use crate::{DbName, RoleName};
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]

39
proxy/src/signals.rs Normal file
View File

@@ -0,0 +1,39 @@
use std::convert::Infallible;
use anyhow::bail;
use tokio_util::sync::CancellationToken;
use tracing::warn;
/// Handle unix signals appropriately.
pub async fn handle<F>(
token: CancellationToken,
mut refresh_config: F,
) -> anyhow::Result<Infallible>
where
F: FnMut(),
{
use tokio::signal::unix::{signal, SignalKind};
let mut hangup = signal(SignalKind::hangup())?;
let mut interrupt = signal(SignalKind::interrupt())?;
let mut terminate = signal(SignalKind::terminate())?;
loop {
tokio::select! {
// Hangup is commonly used for config reload.
_ = hangup.recv() => {
warn!("received SIGHUP");
refresh_config();
}
// Shut down the whole application.
_ = interrupt.recv() => {
warn!("received SIGINT, exiting immediately");
bail!("interrupted");
}
_ = terminate.recv() => {
warn!("received SIGTERM, shutting down once all existing connections have closed");
token.cancel();
}
}
}
}

122
proxy/src/types.rs Normal file
View File

@@ -0,0 +1,122 @@
use crate::intern::{EndpointIdInt, EndpointIdTag, InternId};
macro_rules! smol_str_wrapper {
($name:ident) => {
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct $name(smol_str::SmolStr);
impl $name {
#[allow(unused)]
pub(crate) fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<T> std::cmp::PartialEq<T> for $name
where
smol_str::SmolStr: std::cmp::PartialEq<T>,
{
fn eq(&self, other: &T) -> bool {
self.0.eq(other)
}
}
impl<T> From<T> for $name
where
smol_str::SmolStr: From<T>,
{
fn from(x: T) -> Self {
Self(x.into())
}
}
impl AsRef<str> for $name {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}
impl std::ops::Deref for $name {
type Target = str;
fn deref(&self) -> &str {
&*self.0
}
}
impl<'de> serde::de::Deserialize<'de> for $name {
fn deserialize<D: serde::de::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
<smol_str::SmolStr as serde::de::Deserialize<'de>>::deserialize(d).map(Self)
}
}
impl serde::Serialize for $name {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
self.0.serialize(s)
}
}
};
}
const POOLER_SUFFIX: &str = "-pooler";
impl EndpointId {
#[must_use]
pub fn normalize(&self) -> Self {
if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) {
stripped.into()
} else {
self.clone()
}
}
#[must_use]
pub fn normalize_intern(&self) -> EndpointIdInt {
if let Some(stripped) = self.as_ref().strip_suffix(POOLER_SUFFIX) {
EndpointIdTag::get_interner().get_or_intern(stripped)
} else {
self.into()
}
}
}
// 90% of role name strings are 20 characters or less.
smol_str_wrapper!(RoleName);
// 50% of endpoint strings are 23 characters or less.
smol_str_wrapper!(EndpointId);
// 50% of branch strings are 23 characters or less.
smol_str_wrapper!(BranchId);
// 90% of project strings are 23 characters or less.
smol_str_wrapper!(ProjectId);
// will usually equal endpoint ID
smol_str_wrapper!(EndpointCacheKey);
smol_str_wrapper!(DbName);
// postgres hostname, will likely be a port:ip addr
smol_str_wrapper!(Host);
// Endpoints are a bit tricky. Rare they might be branches or projects.
impl EndpointId {
pub(crate) fn is_endpoint(&self) -> bool {
self.0.starts_with("ep-")
}
pub(crate) fn is_branch(&self) -> bool {
self.0.starts_with("br-")
}
// pub(crate) fn is_project(&self) -> bool {
// !self.is_endpoint() && !self.is_branch()
// }
pub(crate) fn as_branch(&self) -> BranchId {
BranchId(self.0.clone())
}
pub(crate) fn as_project(&self) -> ProjectId {
ProjectId(self.0.clone())
}
}

View File

@@ -497,7 +497,8 @@ mod tests {
use url::Url;
use super::*;
use crate::{http, BranchId, EndpointId};
use crate::http;
use crate::types::{BranchId, EndpointId};
#[tokio::test]
async fn metrics() {

View File

@@ -66,22 +66,25 @@ impl FileStorage {
})
}
/// Create file storage for a new timeline, but don't persist it yet.
pub fn create_new(
timeline_dir: Utf8PathBuf,
/// Create and reliably persist new control file at given location.
///
/// Note: we normally call this in temp directory for atomic init, so
/// interested in FileStorage as a result only in tests.
pub async fn create_new(
dir: Utf8PathBuf,
conf: &SafeKeeperConf,
state: TimelinePersistentState,
) -> Result<FileStorage> {
// we don't support creating new timelines in offloaded state
assert!(matches!(state.eviction_state, EvictionState::Present));
let store = FileStorage {
timeline_dir,
let mut store = FileStorage {
timeline_dir: dir,
no_sync: conf.no_sync,
state,
state: state.clone(),
last_persist_at: Instant::now(),
};
store.persist(&state).await?;
Ok(store)
}
@@ -190,8 +193,6 @@ impl TimelinePersistentState {
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
///
/// For a description, see <https://lwn.net/Articles/457667/>.
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
@@ -269,7 +270,7 @@ mod test {
.await
.expect("failed to create timeline dir");
let state = TimelinePersistentState::empty();
let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
Ok((storage, state))
}

View File

@@ -12,10 +12,10 @@ use tracing::{info, warn};
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
control_file::FileStorage,
state::TimelinePersistentState,
timeline::{Timeline, TimelineError, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines,
@@ -149,17 +149,16 @@ pub async fn handle_request(request: Request) -> Result<()> {
vec![],
request.until_lsn,
start_lsn,
);
)?;
new_state.timeline_start_lsn = start_lsn;
new_state.peer_horizon_lsn = request.until_lsn;
new_state.backup_lsn = new_backup_lsn;
let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?;
file_storage.persist(&new_state).await?;
FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?;
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?;
Ok(())
}

View File

@@ -1,7 +1,6 @@
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
@@ -9,7 +8,6 @@ use serde::{Deserialize, Serialize};
use std::{
cmp::min,
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
@@ -20,7 +18,7 @@ use tokio_util::{
use tracing::{error, info, instrument};
use crate::{
control_file::{self, CONTROL_FILE_NAME},
control_file::CONTROL_FILE_NAME,
debug_dump,
http::{
client::{self, Client},
@@ -28,13 +26,14 @@ use crate::{
},
safekeeper::Term,
state::TimelinePersistentState,
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
timeline::WalResidentTimeline,
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup,
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
wal_storage::open_wal_file,
GlobalTimelines,
};
use utils::{
crashsafe::{durable_rename, fsync_async_opt},
crashsafe::fsync_async_opt,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
logging::SecretString,
lsn::Lsn,
@@ -428,100 +427,9 @@ async fn pull_timeline(
assert!(status.commit_lsn <= status.flush_lsn);
// Finally, load the timeline.
let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?;
Ok(Response {
safekeeper_host: host,
})
}
/// Create temp directory for a new timeline. It needs to be located on the same
/// filesystem as the rest of the timelines. It will be automatically deleted when
/// Utf8TempDir goes out of scope.
pub async fn create_temp_timeline_dir(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
// conf.workdir is usually /storage/safekeeper/data
// will try to transform it into /storage/safekeeper/tmp
let temp_base = conf
.workdir
.parent()
.ok_or(anyhow::anyhow!("workdir has no parent"))?
.join("tmp");
tokio::fs::create_dir_all(&temp_base).await?;
let tli_dir = camino_tempfile::Builder::new()
.suffix("_temptli")
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
.tempdir_in(temp_base)?;
let tli_dir_path = tli_dir.path().to_path_buf();
Ok((tli_dir, tli_dir_path))
}
/// Do basic validation of a temp timeline, before moving it to the global map.
pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
let control_store = control_file::FileStorage::load_control_file(control_path)?;
if control_store.server.wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();
Ok((commit_lsn, flush_lsn))
}
/// Move timeline from a temp directory to the main storage, and load it to the global map.
///
/// This operation is done under a lock to prevent bugs if several concurrent requests are
/// trying to load the same timeline. Note that it doesn't guard against creating the
/// timeline with the same ttid, but no one should be doing this anyway.
pub async fn load_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
) -> Result<Arc<Timeline>> {
// Take a lock to prevent concurrent loadings
let load_lock = GlobalTimelines::loading_lock().await;
let guard = load_lock.lock().await;
if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
bail!("timeline already exists, cannot overwrite it")
}
// Move timeline dir to the correct location
let timeline_path = get_timeline_dir(conf, &ttid);
info!(
"moving timeline {} from {} to {}",
ttid, tmp_path, timeline_path
);
tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
// fsync tenant dir creation
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
let tli = GlobalTimelines::load_timeline(&guard, ttid)
.await
.context("Failed to load timeline after copy")?;
info!(
"loaded timeline {}, flush_lsn={}",
ttid,
tli.get_flush_lsn().await
);
Ok(tli)
}

View File

@@ -339,7 +339,8 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
};
let tli =
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
.await?;
.await
.context("create timeline")?;
tli.wal_residence_guard().await?
}
_ => {

View File

@@ -3,7 +3,7 @@
use std::{cmp::max, ops::Deref};
use anyhow::Result;
use anyhow::{bail, Result};
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use utils::{
@@ -13,7 +13,11 @@ use utils::{
use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
safekeeper::{
AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory,
UNKNOWN_SERVER_VERSION,
},
timeline::TimelineError,
wal_backup_partial::{self},
};
@@ -91,8 +95,24 @@ impl TimelinePersistentState {
peers: Vec<NodeId>,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> TimelinePersistentState {
TimelinePersistentState {
) -> anyhow::Result<TimelinePersistentState> {
if server_info.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
if server_info.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
if commit_lsn < local_start_lsn {
bail!(
"commit_lsn {} is smaller than local_start_lsn {}",
commit_lsn,
local_start_lsn
);
}
Ok(TimelinePersistentState {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
acceptor_state: AcceptorState {
@@ -115,24 +135,23 @@ impl TimelinePersistentState {
),
partial_backup: wal_backup_partial::State::default(),
eviction_state: EvictionState::Present,
}
})
}
#[cfg(test)]
pub fn empty() -> Self {
use crate::safekeeper::UNKNOWN_SERVER_VERSION;
TimelinePersistentState::new(
&TenantTimelineId::empty(),
ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 0,
pg_version: 17, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 16 * 1024 * 1024,
},
vec![],
Lsn::INVALID,
Lsn::INVALID,
)
.unwrap()
}
}

View File

@@ -27,11 +27,11 @@ use utils::{
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::control_file;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
INVALID_TERM,
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, Term, TermLsn,
};
use crate::send_wal::WalSenders;
use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
@@ -40,7 +40,6 @@ use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
@@ -326,44 +325,6 @@ pub struct SharedState {
}
impl SharedState {
/// Initialize fresh timeline state without persisting anything to disk.
fn create_new(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
state: TimelinePersistentState,
) -> Result<Self> {
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
}
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let timeline_dir = get_timeline_dir(conf, ttid);
let control_store =
control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
let wal_store =
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
let sk = SafeKeeper::new(TimelineState::new(control_store), wal_store, conf.my_id)?;
Ok(Self {
sk: StateSK::Loaded(sk),
peers_info: PeersInfo(vec![]),
wal_removal_on_hold: false,
})
}
/// Restore SharedState from control file. If file doesn't exist, bails out.
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
let timeline_dir = get_timeline_dir(conf, ttid);
@@ -450,6 +411,8 @@ pub enum TimelineError {
Cancelled(TenantTimelineId),
#[error("Timeline {0} was not found in global map")]
NotFound(TenantTimelineId),
#[error("Timeline {0} creation is in progress")]
CreationInProgress(TenantTimelineId),
#[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
Invalid(TenantTimelineId),
#[error("Timeline {0} is already exists")]
@@ -514,7 +477,7 @@ pub struct Timeline {
impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(conf, &ttid)?;
@@ -528,7 +491,7 @@ impl Timeline {
let walreceivers = WalReceivers::new();
let remote_path = remote_timeline_path(&ttid)?;
Ok(Timeline {
Ok(Arc::new(Timeline {
ttid,
remote_path,
commit_lsn_watch_tx,
@@ -547,47 +510,7 @@ impl Timeline {
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
})
}
/// Create a new timeline, which is not yet persisted to disk.
pub fn create_empty(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
server_info: ServerInfo,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Timeline> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let walreceivers = WalReceivers::new();
let remote_path = remote_timeline_path(&ttid)?;
Ok(Timeline {
ttid,
remote_path,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancel: CancellationToken::default(),
timeline_dir: get_timeline_dir(conf, &ttid),
manager_ctl: ManagerCtl::new(),
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
})
}))
}
/// Initialize fresh timeline on disk and start background tasks. If init

View File

@@ -5,11 +5,14 @@
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::ServerInfo;
use crate::state::TimelinePersistentState;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::SafeKeeperConf;
use crate::wal_storage::Storage;
use crate::{control_file, wal_storage, SafeKeeperConf};
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::HashMap;
@@ -17,12 +20,22 @@ use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::fs;
use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
// Timeline entry in the global map: either a ready timeline, or mark that it is
// being created.
#[derive(Clone)]
enum GlobalMapTimeline {
CreationInProgress,
Timeline(Arc<Timeline>),
}
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
@@ -31,13 +44,9 @@ struct GlobalTimelinesState {
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
global_rate_limiter: RateLimiter,
}
// Used to prevent concurrent timeline loading.
pub struct TimelineLoadLock;
impl GlobalTimelinesState {
/// Get configuration, which must be set once during init.
fn get_conf(&self) -> &SafeKeeperConf {
@@ -55,22 +64,16 @@ impl GlobalTimelinesState {
)
}
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
let ttid = timeline.ttid;
if self.timelines.contains_key(&ttid) {
bail!(TimelineError::AlreadyExists(ttid));
}
self.timelines.insert(ttid, timeline);
Ok(())
}
/// Get timeline from the map. Returns error if timeline doesn't exist.
/// Get timeline from the map. Returns error if timeline doesn't exist or
/// creation is in progress.
fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
self.timelines
.get(ttid)
.cloned()
.ok_or(TimelineError::NotFound(*ttid))
match self.timelines.get(ttid).cloned() {
Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
Some(GlobalMapTimeline::CreationInProgress) => {
Err(TimelineError::CreationInProgress(*ttid))
}
None => Err(TimelineError::NotFound(*ttid)),
}
}
fn delete(&mut self, ttid: TenantTimelineId) {
@@ -85,7 +88,6 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
tombstones: HashMap::new(),
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
global_rate_limiter: RateLimiter::new(1, 1),
})
});
@@ -141,11 +143,10 @@ impl GlobalTimelines {
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any.
///
/// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
/// sync and there is no important reason to make it async (it is always
/// held for a short while) we just lock and unlock it for each timeline --
/// this function is called during init when nothing else is running, so
/// this is fine.
/// It is async, but TIMELINES_STATE lock is sync and there is no important
/// reason to make it async (it is always held for a short while), so we
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let state = TIMELINES_STATE.lock().unwrap();
@@ -163,14 +164,13 @@ impl GlobalTimelines {
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
.insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
tli.bootstrap(
&mut shared_state,
&conf,
@@ -199,51 +199,6 @@ impl GlobalTimelines {
Ok(())
}
/// Take a lock for timeline loading.
pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
TIMELINES_STATE.lock().unwrap().load_lock.clone()
}
/// Load timeline from disk to the memory.
pub async fn load_timeline<'a>(
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set, partial_backup_rate_limiter) =
TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
let mut shared_state = tli.write_shared_state().await;
// TODO: prevent concurrent timeline creation/loading
{
let mut state = TIMELINES_STATE.lock().unwrap();
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
warn!("Un-deleted timeline {ttid}");
}
state.timelines.insert(ttid, tli.clone());
}
tli.bootstrap(
&mut shared_state,
&conf,
broker_active_set,
partial_backup_rate_limiter,
);
drop(shared_state);
Ok(tli)
}
// If we can't load a timeline, it's bad. Caller will figure it out.
Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
}
}
/// Get the number of timelines in the map.
pub fn timelines_count() -> usize {
TIMELINES_STATE.lock().unwrap().timelines.len()
@@ -266,7 +221,7 @@ impl GlobalTimelines {
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, _, _) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -282,55 +237,146 @@ impl GlobalTimelines {
info!("creating new timeline {}", ttid);
let timeline = Arc::new(Timeline::create_empty(
&conf,
ttid,
server_info,
commit_lsn,
local_start_lsn,
)?);
// Do on disk initialization in tmp dir.
let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
// Take a lock and finish the initialization holding this mutex. No other threads
// can interfere with creation after we will insert timeline into the map.
{
let mut shared_state = timeline.write_shared_state().await;
// TODO: currently we create only cfile. It would be reasonable to
// immediately initialize first WAL segment as well.
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?;
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
Ok(timeline)
}
// We can get a race condition here in case of concurrent create calls, but only
// in theory. create() will return valid timeline on the next try.
TIMELINES_STATE
.lock()
.unwrap()
.try_insert(timeline.clone())?;
/// Move timeline from a temp directory to the main storage, and load it to
/// the global map. Creating timeline in this way ensures atomicity: rename
/// is atomic, so either move of the whole datadir succeeds or it doesn't,
/// but corrupted data dir shouldn't be possible.
///
/// We'd like to avoid holding map lock while doing IO, so it's a 3 step
/// process:
/// 1) check the global map that timeline doesn't exist and mark that we're
/// creating it;
/// 2) move the directory and load the timeline
/// 3) take lock again and insert the timeline into the global map.
pub async fn load_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let mut state = TIMELINES_STATE.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
bail!(TimelineError::CreationInProgress(ttid));
}
Some(GlobalMapTimeline::Timeline(_)) => {
bail!(TimelineError::AlreadyExists(ttid));
}
_ => {}
}
if check_tombstone {
if state.tombstones.contains_key(&ttid) {
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
}
} else {
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
warn!("un-deleted timeline {ttid}");
}
}
state
.timelines
.insert(ttid, GlobalMapTimeline::CreationInProgress);
state.get_dependencies()
};
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline
.init_new(
&mut shared_state,
// Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await {
Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(matches!(
state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress)
));
state
.timelines
.insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
drop(state);
timeline.bootstrap(
&mut timeline_shared_state,
&conf,
broker_active_set,
partial_backup_rate_limiter,
)
.await
{
// Note: the most likely reason for init failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
// the timeline directory in this case, for further inspection.
// TODO: this is an unusual error, perhaps we should send it to sentry
// TODO: compute will try to create timeline every second, we should add backoff
error!("failed to init new timeline {}: {}", ttid, e);
// Timeline failed to init, it cannot be used. Remove it from the map.
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
return Err(e);
);
drop(timeline_shared_state);
Ok(timeline)
}
Err(e) => {
// Init failed, remove the marker from the map
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(matches!(
state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress)
));
state.timelines.remove(&ttid);
Err(e)
}
// We are done with bootstrap, release the lock, return the timeline.
// {} block forces release before .await
}
Ok(timeline)
}
/// Main part of load_temp_timeline: do the move and load.
async fn install_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
conf: &SafeKeeperConf,
) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf, &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf, &ttid);
// We must have already checked that timeline doesn't exist in the map,
// but there might be existing datadir: if timeline is corrupted it is
// not loaded. We don't want to overwrite such a dir, so check for its
// existence.
match fs::metadata(&timeline_path).await {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
// and return error.
bail!(TimelineError::Invalid(ttid));
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
return Err(e.into());
}
}
info!(
"moving timeline {} from {} to {}",
ttid, tmp_path, timeline_path
);
// Now it is safe to move the timeline directory to the correct
// location. First, create tenant directory. Ignore error if it already
// exists.
if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
if e.kind() != std::io::ErrorKind::AlreadyExists {
return Err(e.into());
}
}
// fsync it
fsync_async_opt(&tenant_path, !conf.no_sync).await?;
// and its creation
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
// Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
Timeline::load_timeline(conf, ttid)
}
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
@@ -358,8 +404,16 @@ impl GlobalTimelines {
global_lock
.timelines
.values()
.filter(|t| !t.is_cancelled())
.cloned()
.filter_map(|t| match t {
GlobalMapTimeline::Timeline(t) => {
if t.is_cancelled() {
None
} else {
Some(t.clone())
}
}
_ => None,
})
.collect()
}
@@ -370,8 +424,11 @@ impl GlobalTimelines {
global_lock
.timelines
.values()
.filter_map(|t| match t {
GlobalMapTimeline::Timeline(t) => Some(t.clone()),
_ => None,
})
.filter(|t| t.ttid.tenant_id == tenant_id)
.cloned()
.collect()
}
@@ -504,3 +561,45 @@ fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
Err(e) => Err(e.into()),
}
}
/// Create temp directory for a new timeline. It needs to be located on the same
/// filesystem as the rest of the timelines. It will be automatically deleted when
/// Utf8TempDir goes out of scope.
pub async fn create_temp_timeline_dir(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
let temp_base = conf.workdir.join("tmp");
tokio::fs::create_dir_all(&temp_base).await?;
let tli_dir = camino_tempfile::Builder::new()
.suffix("_temptli")
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
.tempdir_in(temp_base)?;
let tli_dir_path = tli_dir.path().to_path_buf();
Ok((tli_dir, tli_dir_path))
}
/// Do basic validation of a temp timeline, before moving it to the global map.
pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
let control_store = control_file::FileStorage::load_control_file(control_path)?;
if control_store.server.wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();
Ok((commit_lsn, flush_lsn))
}

View File

@@ -186,8 +186,14 @@ impl PhysicalStorage {
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
if flush_lsn < state.commit_lsn {
bail!("timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn {} from control file", ttid.timeline_id, flush_lsn, state.commit_lsn);
}
if flush_lsn < state.peer_horizon_lsn {
warn!(
"timeline {}: flush_lsn {} is less than cfile peer_horizon_lsn {}",
ttid.timeline_id, flush_lsn, state.peer_horizon_lsn
);
}
Ok(PhysicalStorage {

View File

@@ -59,7 +59,7 @@ impl GlobalMap {
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
"commit_lsn {} is smaller than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
@@ -96,23 +96,7 @@ impl GlobalMap {
let local_start_lsn = Lsn::INVALID;
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(ttid));
}
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
}
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
let disk_timeline = self.disk.put_state(&ttid, state);
let control_store = DiskStateStorage::new(disk_timeline.clone());

View File

@@ -37,6 +37,12 @@ pub(crate) struct StorageControllerMetricGroup {
/// Count of how many times we spawn a reconcile task
pub(crate) storage_controller_reconcile_spawn: measured::Counter,
/// Size of the in-memory map of tenant shards
pub(crate) storage_controller_tenant_shards: measured::Gauge,
/// Size of the in-memory map of pageserver_nodes
pub(crate) storage_controller_pageserver_nodes: measured::Gauge,
/// Reconciler tasks completed, broken down by success/failure/cancelled
pub(crate) storage_controller_reconcile_complete:
measured::CounterVec<ReconcileCompleteLabelGroupSet>,

View File

@@ -450,6 +450,9 @@ impl Reconciler {
}
}
/// This function does _not_ mutate any state, so it is cancellation safe.
///
/// This function does not respect [`Self::cancel`], callers should handle that.
async fn await_lsn(
&self,
tenant_shard_id: TenantShardId,
@@ -570,8 +573,10 @@ impl Reconciler {
if let Some(baseline) = baseline_lsns {
tracing::info!("🕑 Waiting for LSN to catch up...");
self.await_lsn(self.tenant_shard_id, &dest_ps, baseline)
.await?;
tokio::select! {
r = self.await_lsn(self.tenant_shard_id, &dest_ps, baseline) => {r?;}
_ = self.cancel.cancelled() => {return Err(ReconcileError::Cancel)}
};
}
tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");

View File

@@ -934,7 +934,6 @@ impl Service {
self.startup_complete.clone().wait().await;
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
while !self.reconcilers_cancel.is_cancelled() {
tokio::select! {
@@ -1272,6 +1271,10 @@ impl Service {
.collect::<Vec<_>>();
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.list_tenant_shards().await?;
@@ -4110,9 +4113,9 @@ impl Service {
(
old_attached,
generation,
old_state.policy,
old_state.policy.clone(),
old_state.shard,
old_state.config,
old_state.config.clone(),
)
};
@@ -5075,6 +5078,10 @@ impl Service {
let mut nodes = (*locked.nodes).clone();
nodes.remove(&node_id);
locked.nodes = Arc::new(nodes);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_nodes
.set(locked.nodes.len() as i64);
locked.scheduler.node_remove(node_id);
@@ -5158,6 +5165,10 @@ impl Service {
removed_node.set_availability(NodeAvailability::Offline);
}
*nodes = Arc::new(nodes_mut);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
}
}
@@ -5346,6 +5357,11 @@ impl Service {
locked.nodes = Arc::new(new_nodes);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_nodes
.set(locked.nodes.len() as i64);
tracing::info!(
"Registered pageserver {}, now have {} pageservers",
register_req.node_id,

View File

@@ -473,6 +473,11 @@ impl TenantShard {
shard: ShardIdentity,
policy: PlacementPolicy,
) -> Self {
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_tenant_shards
.inc();
Self {
tenant_shard_id,
policy,
@@ -1384,6 +1389,11 @@ impl TenantShard {
let tenant_shard_id = tsp.get_tenant_shard_id()?;
let shard_identity = tsp.get_shard_identity()?;
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_tenant_shards
.inc();
Ok(Self {
tenant_shard_id,
shard: shard_identity,
@@ -1512,6 +1522,15 @@ impl TenantShard {
}
}
impl Drop for TenantShard {
fn drop(&mut self) {
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_tenant_shards
.dec();
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::{cell::RefCell, rc::Rc};

View File

@@ -16,6 +16,7 @@ from typing import TYPE_CHECKING, Any, Callable, TypeVar
from urllib.parse import urlencode
import allure
import pytest
import zstandard
from psycopg2.extensions import cursor
from typing_extensions import override
@@ -634,9 +635,27 @@ def allpairs_versions():
the different versions.
"""
ids = []
argvalues = []
compat_not_defined = (
os.getenv("COMPATIBILITY_POSTGRES_DISTRIB_DIR") is None
or os.getenv("COMPATIBILITY_NEON_BIN") is None
)
for pair in VERSIONS_COMBINATIONS:
cur_id = []
all_new = all(v == "new" for v in pair.values())
for component in sorted(pair.keys()):
cur_id.append(pair[component][0])
# Adding None if all versions are new, sof no need to mix at all
# If COMPATIBILITY_NEON_BIN or COMPATIBILITY_POSTGRES_DISTRIB_DIR are not defined,
# we will skip all the tests which include the versions mix.
argvalues.append(
pytest.param(
None if all_new else pair,
marks=pytest.mark.skipif(
compat_not_defined and not all_new,
reason="COMPATIBILITY_NEON_BIN or COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set",
),
)
)
ids.append(f"combination_{''.join(cur_id)}")
return {"argnames": "combination", "argvalues": VERSIONS_COMBINATIONS, "ids": ids}
return {"argnames": "combination", "argvalues": tuple(argvalues), "ids": ids}

View File

@@ -0,0 +1,52 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
for _ in range(60):
time.sleep(1) # give prewarm BGW some time to proceed
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
if prewarm_info[0] > 0:
log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%")
if prewarm_info[0] == prewarm_info[1]:
break
assert lfc_used_pages > 10000
assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1]
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
assert prewarm_info[1] > 0

View File

@@ -107,6 +107,15 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
for tid in tenant_ids:
env.create_tenant(tid, shard_count=shards_per_tenant)
# Validate high level metrics
assert (
env.storage_controller.get_metric_value("storage_controller_tenant_shards")
== len(tenant_ids) * shards_per_tenant
)
assert env.storage_controller.get_metric_value("storage_controller_pageserver_nodes") == len(
env.storage_controller.node_list()
)
# Repeating a creation should be idempotent (we are just testing it doesn't return an error)
env.storage_controller.tenant_create(
tenant_id=next(iter(tenant_ids)), shard_count=shards_per_tenant

View File

@@ -435,7 +435,9 @@ def test_emergency_relocate_with_branches_slow_replay(
# This fail point will pause the WAL ingestion on the main branch, after the
# the first insert
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
pageserver_http.configure_failpoints(
[("pageserver-wal-ingest-logical-message-sleep", "return(5000)")]
)
# Attach and wait a few seconds to give it time to load the tenants, attach to the
# safekeepers, and to stream and ingest the WAL up to the pause-point.
@@ -453,11 +455,13 @@ def test_emergency_relocate_with_branches_slow_replay(
assert cur.fetchall() == [("before pause",), ("after pause",)]
# Sanity check that the failpoint was reached
env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
env.pageserver.assert_log_contains(
'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done'
)
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))
# Simulate hard crash of pageserver and re-attach a tenant with a branch
@@ -581,7 +585,9 @@ def test_emergency_relocate_with_branches_createdb(
# bug reproduced easily even without this, as there is always some delay between
# loading the timeline and establishing the connection to the safekeeper to stream and
# ingest the WAL, but let's make this less dependent on accidental timing.
pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")])
pageserver_http.configure_failpoints(
[("pageserver-wal-ingest-logical-message-sleep", "return(5000)")]
)
before_attach_time = time.time()
env.pageserver.tenant_attach(tenant_id)
@@ -590,8 +596,10 @@ def test_emergency_relocate_with_branches_createdb(
assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200
# Sanity check that the failpoint was reached
env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done')
env.pageserver.assert_log_contains(
'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done'
)
assert time.time() - before_attach_time > 5
# Clean up
pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off"))
pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))

View File

@@ -4,8 +4,11 @@ import pytest
from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId
from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty
from fixtures.remote_storage import s3_storage
from fixtures.utils import wait_until
@@ -168,7 +171,7 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
state=TimelineArchivalState.ARCHIVED,
)
def timeline_offloaded(timeline_id: TimelineId) -> bool:
def timeline_offloaded_logged(timeline_id: TimelineId) -> bool:
return (
env.pageserver.log_contains(f".*{timeline_id}.* offloading archived timeline.*")
is not None
@@ -186,12 +189,12 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
def parent_offloaded():
if manual_offload:
ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=parent_timeline_id)
assert timeline_offloaded(parent_timeline_id)
assert timeline_offloaded_logged(parent_timeline_id)
def leaf_offloaded():
if manual_offload:
ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=leaf_timeline_id)
assert timeline_offloaded(leaf_timeline_id)
assert timeline_offloaded_logged(leaf_timeline_id)
wait_until(30, 1, leaf_offloaded)
wait_until(30, 1, parent_offloaded)
@@ -218,4 +221,118 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
sum_again = endpoint.safe_psql("SELECT sum(key) from foo where key > 50")
assert sum == sum_again
assert not timeline_offloaded(initial_timeline_id)
assert not timeline_offloaded_logged(initial_timeline_id)
def test_timeline_offload_persist(neon_env_builder: NeonEnvBuilder):
"""
Test for persistence of timeline offload state
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
# Turn off gc and compaction loops: we want to issue them manually for better reliability
tenant_id, root_timeline_id = env.create_tenant(
conf={
"gc_period": "0s",
"compaction_period": "0s",
"checkpoint_distance": f"{1024 ** 2}",
}
)
# Create a branch and archive it
child_timeline_id = env.create_branch("test_archived_branch_persisted", tenant_id)
with env.endpoints.create_start(
"test_archived_branch_persisted", tenant_id=tenant_id
) as endpoint:
endpoint.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'data_content')",
"INSERT INTO foo SELECT FROM generate_series(1,2048)",
]
)
sum = endpoint.safe_psql("SELECT sum(key) from foo where key < 500")
last_flush_lsn_upload(env, endpoint, tenant_id, child_timeline_id)
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/",
)
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/tenant-manifest",
)
ps_http.timeline_archival_config(
tenant_id,
child_timeline_id,
state=TimelineArchivalState.ARCHIVED,
)
leaf_detail = ps_http.timeline_detail(
tenant_id,
child_timeline_id,
)
assert leaf_detail["is_archived"] is True
def timeline_offloaded_api(timeline_id: TimelineId) -> bool:
# TODO add a proper API to check if a timeline has been offloaded or not
return not any(
timeline["timeline_id"] == str(timeline_id)
for timeline in ps_http.timeline_list(tenant_id=tenant_id)
)
def child_offloaded():
ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=child_timeline_id)
assert timeline_offloaded_api(child_timeline_id)
wait_until(30, 1, child_offloaded)
assert timeline_offloaded_api(child_timeline_id)
assert not timeline_offloaded_api(root_timeline_id)
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/tenant-manifest",
)
# Test persistence, is the timeline still offloaded?
env.pageserver.stop()
env.pageserver.start()
assert timeline_offloaded_api(child_timeline_id)
assert not timeline_offloaded_api(root_timeline_id)
ps_http.timeline_archival_config(
tenant_id,
child_timeline_id,
state=TimelineArchivalState.UNARCHIVED,
)
child_detail = ps_http.timeline_detail(
tenant_id,
child_timeline_id,
)
assert child_detail["is_archived"] is False
with env.endpoints.create_start(
"test_archived_branch_persisted", tenant_id=tenant_id
) as endpoint:
sum_again = endpoint.safe_psql("SELECT sum(key) from foo where key < 500")
assert sum == sum_again
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(env.initial_tenant)}/tenant-manifest",
)
assert not timeline_offloaded_api(root_timeline_id)
ps_http.tenant_delete(tenant_id)
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/",
)

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.0",
"68b5038f27e493bde6ae552fe066f10cbdfe6a14"
"37d5ead146b028dd9a5c07e7a37068ec0df9f465"
],
"v16": [
"16.4",
"e131a9c027b202ce92bd7b9cf2569d48a6f9948e"
"cc36e03bd0c927022cf3b3563e291e42d75366a1"
],
"v15": [
"15.8",
"22e580fe9ffcea7e02592110b1c9bf426d83cada"
"a4830163a65811578824ce4022c1cd3daef33d4e"
],
"v14": [
"14.13",
"2199b83fb72680001ce0f43bf6187a21dfb8f45d"
"ecb1020ff71927e9dd59c526254bb8846bb73ee1"
]
}