Compare commits

..

2 Commits

Author SHA1 Message Date
danieltprice
1a7afd7c1a Update CHANGELOG.md
Revie changelog format
2023-05-28 08:25:31 -03:00
danieltprice
89e988d9d8 Create CHANGELOG.md
Add changelog document. The intention is make the Neon Release Notes more focused on the end user. The audience for this changelog will be internal and external developers. Entries in the changelog can be more technical and may even include direct links to PRs.

The initial content that you see in this PR is taken from the Neon Release Notes. Expect the changelog content for future releases to differ as it becomes more technical in nature, written for developers as the primary audience.
2023-05-28 07:26:45 -03:00
28 changed files with 792 additions and 866 deletions

View File

@@ -492,24 +492,19 @@ jobs:
env:
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
run: |
scripts/coverage --dir=/tmp/coverage \
report \
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=${COMMIT_URL} \
--format=github
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=lcov
- name: Upload coverage report
id: upload-coverage-report
env:
BUCKET: neon-github-public-dev
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
@@ -802,7 +797,7 @@ jobs:
- name: Build vm image
run: |
./vm-builder -enable-file-cache -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Pushing vm-compute-node image
run: |

392
docs/CHANGELOG.md Normal file
View File

@@ -0,0 +1,392 @@
# CHANGELOG
This is the Neon storage and compute changelog. It is a record of **notable changes** made to this project. The intended audience is developers, including those on the Neon team and external developers who may use or contribute to this project. The format of the changelog is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
The changes documented here may also appear in the [Neon Release Notes](https://neon.tech/docs/release-notes), but the information provided here is typically more technical in nature and aimed at developers. The _Neon Release Notes_ are more user-oriented and focused on benefits and usage of the new features or changes.
## 2023-05-23
### Added
- Compute: Implemented a `cargo neon` utility to facilitate setting up the Neon project locally. [Setup instructions](https://github.com/neondatabase/neon#running-neon-database) have been updated to reflect this change.
### Changed
- Compute: Updated PostgreSQL versions to 14.8 and 15.3, respectively.
## 2023-05-16
### Changed
- Proxy: Neon uses compute endpoint domain names to route incoming client connections. For example, to connect to the compute endpoint `ep-mute-recipe-239816`, we ask that you connect to `ep-mute-recipe-239816.us-east-2.aws.neon.tech`. However, the PostgreSQL wire protocol does not transfer the server domain name, so Neon relies on the Server Name Indication (SNI) extension of the TLS protocol to do this. Unfortunately, not all PostgreSQL clients support SNI. When these clients attempt to connect, they receive an error indicating that the "endpoint ID is not specified".
As a workaround, Neon provides a special connection option that allows clients to specify the compute endpoint they are connecting to. The connection option was previously named `project`. This option name is now deprecated but remains supported for backward compatibility. The new name for the connection option is `endpoint`, which is used as shown in the following example:
```txt
postgres://<user>:<password>@ep-mute-recipe-239816.us-east-2.aws.neon.tech/main?options=endpoint%3Dep-mute-recipe-239816
```
For more information about this special connection option for PostgreSQL clients that do not support SNI, refer to our [connection workarounds](https://neon.tech/docs/connect/connectivity-issues#workarounds) documentation.
### Fixed
- Pageserver: Branch deletion status was not tracked in S3 storage, which could result in a deleted branch remaining accessible.
- Pageserver: Addressed intermittent `failed to flush page requests` errors by adjusting Pageserver timeout settings.
## 2023-5-05
### Added
- Pageserver: Added WAL receiver context information for `Timed out while waiting for WAL record` errors. The additional information is used for diagnostic purposes.
- Safekeeper, Pageserver: Added Safekeeper and Pageserver metrics that count the number of received queries, broker messages, removed WAL segments, and connection switch events.
### Fixed
- Safekeeper: When establishing a connection to a Safekeeper, an `Lsn::INVALID` value was sent from the Safekeeper to the Pageserver if there were no WAL records to send. This incorrectly indicated to the Pageserver that the Safekeeper was lagging behind, causing the Pageserver to connect to a different Safekeeper. Instead of `Lsn::INVALID`, the most recent `commit_lsn` value is now sent instead.
## 2023-05-09
### Changed
- Compute: Dockerfile updates for the [neondatabase/neon](https://github.com/neondatabase/neon) repository are now automatically pushed to our [public Docker Hub repository](https://hub.docker.com/u/neondatabase). This enhancement means that you no longer need to manually track and incorporate Dockerfile updates to build and test Neon locally. Instead, these changes will be available and ready to use directly from our Docker Hub repository.
- Safekeepers: Enabled parallel offload of WAL segments to remote storage. This change allows Safekeepers to upload up to five WAL segments concurrently.
## 2023-04-28
### Added
- Compute: Added support for the `US East (N. Virginia) — aws-us-east-1` region. For more information about Neon's region support, see [Regions](https://neon.tech/docs/introduction/regions).
- Compute: Added support for the `ip4r` and `pg_hint_plan` extensions. For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Compute: Added support for `lz4` and `zstd` WAL compression methods.
- Compute: Added support for `procps`, which is a set of utilities for process monitoring.
- Pageserver: Implemented `syscalls` changes in the WAL redo `seccomp` (secure computing mode) code to ensure AArch64 support.
## 2023-04-11
### Added
- Pageserver: Added `disk_size` and `instance_type` properties to the Pageserver API. This data is required to support assigning Neon projects to Pageservers based on Pageserver disk usage.
- Proxy: Added error reporting for unusually low `proxy_io_bytes_per_client metric` values.
- Proxy: Added support for additional domain names to enable partner integrations with Neon.
- Safekeeper: The `wal_backup_lsn` is now advanced after each WAL segment is offloaded to storage to avoid lags in WAL segment cleanup.
- Safekeeper: Added a timeout for reading from the socket in the Safekeeper WAL service to avoid an accumulation of waiting threads.
### Fixed
- Pageserver: Corrected an issue that caused data layer eviction to occur at a percentage above the configured disk-usage threshold.
- Proxy: The passwordless authentication proxy ignored non-wildcard common names, passing a `None` value instead. A non-wildcard common name is now set, and an error is reported if a `None` value is passed.
## 2023-03-28
### Changed
- Pageserver: Logical size and partitioning values are now computed before threshold-based eviction of data layers to avoid downloading previously evicted data layer files when restarting a Pageserver.
- Compute: Free space in the local file system that Neon uses for temporary files, unlogged tables, and the local file cache, is now monitored in order to maximize the space available for the local file cache.
### Fixed
- Pageserver: The delete timeline endpoint in the Pageserver management API did not return the proper HTTP code.
- Pageserver: Fixed an issue in a data storage size calculation that caused an incorrect value to be returned.
- Pageserver: Addressed unexpected data layer downloads that occurred after a Pageserver restart. The data layers most likely required for the data storage size calculation after a Pageserver restart are now retained.
## 2023-03-21
### Added
- Added metrics that enable detection of data layer eviction thrashing (repetition of eviction and on-demand download of data layers).
- Safekeeper: Added an internal metric to track bytes written or read in PostgreSQL connections to Safekeepers, which enables monitoring traffic between availability zones.
### Changed
- Pageserver: Improved the check for unexpected trailing data when importing a basebackup, which is tarball with files required to bootstrap a compute node.
- Pageserver: Separated the management and `libpq` configuration, making it possible to enable authentication for only the management HTTP API or the Compute API.
- Pageserver: Reduced the amount of metrics data collected for Pageservers.
- Pageserver: JWT (JSON Web Token) generation is now permitted to fail when running Pageservers with authentication disabled, which enables running without the 'openssl' binary. This change enables switching to the EdDSA algorithm for storing JWT authentication tokens.
- Pageserver: Switched to the EdDSA algorithm for the storage JWT authentication tokens. The Neon Control Plane only supports EdDSA.
- Pageserver, Safekeeper: Revised `$NEON_AUTH_TOKEN` variable handling when connecting from a compute to Pageservers and Safekeepers.
- Proxy: All compute node connection errors are now logged.
### Fixed
- Pageserver: Fixed an issue that resulted in old data layers not being garbage collected.
- Proxy: Fixed an issue that caused Websocket connections through the Proxy to become unresponsive.
### Removed
- Pageserver, Safekeeper: Removed unused Control Plane code.
## 2023-03-13
### Added
- Compute: Released a new `pg_tiktoken` PostgreSQL extension, created by the Neon engineering team. The extension is a wrapper for [OpenAIs tokenizer](https://github.com/openai/tiktoken). It provides fast and efficient tokenization of data stored in a PostgreSQL database.
The extension supports two functions:
- The `tiktoken_encode` function takes text input and returns tokenized output, making it easier to analyze and process text data.
- The `tiktoken_count` function returns the number of tokens in a text, which is useful for checking text length limits, such as those imposed by OpenAIs language models.
For more information about the `pg_tiktoken` extension, refer to the blog post: [Announcing pg_tiktoken: A Postgres Extension for Fast BPE Tokenization](https://neon.tech/blog/announcing-pg_tiktoken-a-postgres-extension-for-fast-bpe-tokenization). The `pg_tiktoken` code is available on [GitHub](https://github.com/kelvich/pg_tiktoken).
- Compute: Added support for the PostgreSQL `prefix`, `hll` and `plpgsql_check` extensions. For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Compute, Pageserver, Safekeeper: Added support for RS384 and RS512 JWT tokens, used to securely transmit information as JSON objects.
- Autoscaling: Added support for scaling Neon's local file cache size when scaling a virtual machine.
### Removed
- Pageserver: Removed the block cursor cache, which provided little performance benefit and would hold page references that caused deadlocks.
## 2023-03-07
### Added
- Pageserver: Added logic to handle unexpected Write-Ahead Log (WAL) redo process failures, which could cause a `Broken pipe` error on the Pageserver. In case of a failure, the WAL redo process is now restarted, and requests to apply redo records are retried automatically.
- Pageserver: Added timeout logic for the copy operation that occurs when downloading a data layer. The timeout logic prevents a deadlock state if a data layer download is blocked.
### Fixed
- Safekeeper: Addressed `Failed to open WAL file` warnings that appeared in the Safekeeper log files. The warnings were due to an outdated `truncate_lsn` value on the Safekeeper, which caused the _walproposer_ (the Postgres compute node) to download WAL records starting from a Log Sequence Number (LSN) that was older than the `backup_lsn`. This resulted in unnecessary WAL record downloads from cold storage.
## 2023-03-03
### Added
- Compute: Added support for the PostgreSQL `rum` and `pgTAP` extensions. For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
### Fixed
- Pageserver: A system metric that monitors physical data size overflowed when a garbage collection operation was performed on an evicted data layer.
- Pageserver: An index upload was skipped when a compaction operation did not perform an on-demand download from storage. With no on-demand downloads, the compaction function would exit before scheduling the index upload.
## 2023-02-28
### Added
- Compute: Added support for the following PostgreSQL extensions:
- `pg_graphql`
- `pg_jsonschema`
- `pg_hashids`
- `pgrouting`
- `hypopg`
- Server Programming Interface (SPI) extensions:
- `autoinc`
- `insert_username`
- `moddatetime`
- `refint`
For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
### Changed
- Compute: Updated supported PostgreSQL versions to [14.7](https://www.postgresql.org/docs/release/14.7/) and [15.2](https://www.postgresql.org/docs/release/15.2/), respectively.
- Pageserver: Optimized the log-structured merge-tree (LSM tree) implementation to reduce [write amplification](https://en.wikipedia.org/wiki/Write_amplification).
## 2023-02-21
### Added
- Compute: Added support for the PostgreSQL `xml2` and `pgjwt` extensions. For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
### Changed
- Compute: Updated the versions for the following PostgreSQL extensions:
- Updated the `address_standardizer`, `address_standardizer_data_us`, `postgis`, `postgis_raster`, `postgis_tiger_geocoder`, `postgis_topology` extensions to version `3.3.2`.
- Updated the `plv8`, `plls`, `plcoffee` extensions to `3.1.5`.
- Updated the `h3_pg` extension to `4.1.2`.
Updating an extension version requires running an `ALTER EXTENSION <extension_name> UPDATE TO <new_version>` statement. For example, to update the `postgis_topology` extension to the newly supported version, run this statement:
```sql
ALTER EXTENSION postgis_topology UPDATE TO '3.3.2';
```
- Proxy: Enabled `OpenTelemetry` tracing to capture all incoming requests. This change enables Neon to perform an end-to-end trace when a new connection is established.
### Fixed
- Pageserver: Corrected the storage size metrics calculation to ensure that only active branches are counted.
## 2023-02-14
### Added
- Compute: Added support for the PostgreSQL `pgvector`, `plls` and `plcoffee` extensions. For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Pageserver: Added an experimental feature that automatically evicts layer files from Pageservers to optimize local storage space usage. When enabled for a project, Pageservers periodically check the access timestamps of the project's layer files. If the most recent layer file access is further in the past than a configurable threshold, the Pageserver removes the layer file from local storage. The authoritative copy of the layer file remains in S3. A Pageserver can download a layer file from S3 on-demand if it is needed again, to reconstruct a page version for a client request, for example.
### Changed
- Proxy: Reduced network latencies for WebSocket and pooled connections by implementing caching mechanism for compute node connection information and enabling the `TCP_NODELAY` protocol option. The `TCP_NODELAY` option causes segments to be sent as soon as possible, even if there is only a small amount of data. For more information, refer to the [TCP protocol man page](https://linux.die.net/man/7/tcp).
## 2023-02-07
### Added
- Compute: Added support for the PostgreSQL `postgis-sfcgal` extension. For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Compute: Added support for [International Components for Unicode (ICU)](https://icu.unicode.org/), which permits defining collation objects that use ICU as the collation provider. For example:
```sql
CREATE COLLATION german (provider = icu, locale = 'de');
```
## 2023-01-23
### Fixed
- Compute: Fixed a compute instance restart error. When a compute instance was restarted after a role was deleted in the console, the restart operation failed with a "role does not exist" error while attempting to reassign the objects owned by the deleted role.
## 2023-01-31
### Added
- Compute: Added support for the PostgreSQL `unit` extension. For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Pageserver: Implemented an asynchronous pipe for communication with the Write Ahead Log (WAL) redo process, which helps improves OLAP query performance.
### Changed
- Pageserver: Reimplemented the layer map used to track the data layers in a branch. The layer map now uses an immutable binary search tree (BST) data structure, which improves data layer lookup performance over the previous R-tree implementation. The data required to reconstruct page versions is stored as data layers in Neon Pageservers.
- Pageserver: Changed the garbage collection (`gc`) interval from 100 seconds to 60 minutes. This change reduces the frequency of layer map locks.
### Removed
- Compute: Removed logic that updated roles each time a Neon compute instance was restarted. Roles were updated on each restart to address a password-related backward compatibility issue that is no longer relevant.
## 2023-01-17
### Added
- Compute: Added support for several PostgreSQL extensions. Newly supported extensions include:
- `bloom`
- `pgrowlocks`
- `intagg`
- `pgstattuple`
- `earthdistance`
- `address_standardizer`
- `address_standardizer_data_us`
For more information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Compute: Added statistics to `EXPLAIN` that show prefetch hits and misses for sequential scans.
### Changed
- Compute: Updated the list of PostgreSQL client libraries and runtimes that Neon tests for connection support. The `pg8000` Python PostgreSQL driver, version 1.29.3 and higher, now supports connecting to Neon.
- Proxy: Updated the error message reported when attempting to connect from a client or driver that does not support Server Name Indication (SNI). For more information about the SNI requirement, see [Connect from old clients](https://neon.tech/docs/connect/connectivity-issues). Previously, the error message indicated that the "Project ID" is not specified. The error message now states that the "Endpoint ID" is not specified. Connecting to Neon with a Project ID remains supported for backward compatibility, but connecting with an Endpoint ID is now the recommended connection method. For general information about connecting to Neon, see [Connect from any application](https://neon.tech/docs/connect/connect-from-any-app).
## 2022-12-08
### Added
- Pageserver: Added support for on-demand download of layer files from cold storage. Layer files contain the data required reconstruct any version of a data page. On-demand download enables Neon to quickly distribute data across Pageservers and recover from local Pageserver failures. This feature augments Neon's storage capability by allowing data to be transferred efficiently from cold storage to Pageservers whenever the data is needed.
## 2022-11-16
### Added
- Pageserver: Added a tenant sizing model and an endpoint for retrieving the tenant size.
### Changed
- Pageserver: Moved the Write-Ahead Log (WAL) redo process code from Neon's `postgres` repository to the `neon` repository and created a separate `wal_redo` binary in order to reduce the amount of change in the `postgres` repository codebase.
- Compute: Updated prefetching support to store requests and responses in a ring buffer instead of a queue, which enables using prefetches from many relations concurrently.
### Removed
- Pageserver, Safekeeper, Compute, and Proxy: Reduced the size of Neon storage binaries by 50% by removing dependency debug symbols from the release build.
- Pageserver and Safekeeper: Removed support for the `--daemonize` option from the CLI process that starts the Pageserver and Safekeeper storage components. The required library is no longer being maintained and the option was only used in our test environment.
## 2022-10-25
### Added
- Compute: Added support for PostgreSQL 15.0 and its PostgreSQL extensions.
For information about supported extensions, see [Supported PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Pageserver: Added a timeline `state` field to the `TimelineInfo` struct that is returned by the `timelines` internal management API endpoint. Timeline `state` information improves observability and communication between Pageserver modules.
### Changed
- Compute: Disabled the `wal_log_hints` parameter, which is the default PostgreSQL setting. The Pageserver-related issue that required enabling `wal_log_hints` has been addressed, and enabling `wal_log_hints` is no longer necessary.
## 2022-10-21
### Fixed
- Compute: Fixed an issue that prevented creating a database when the specified database name included trailing spaces.
- Pageserver: Fixed an `INSERT ... ON CONFLICT` handling issue for speculative Write-Ahead Log (WAL) record inserts. Under certain load conditions, records added with `INSERT ... ON CONFLICT` could be replayed incorrectly.
- Pageserver: Fixed a Free Space Map (FSM) and Visibility Map (VM) page reconstruction issue that caused compute nodes to start slowly under certain workloads.
- Pageserver: Fixed a garbage collection (GC) issue that could lead to a database failure under high load.
- Pageserver: Improved query performance for larger databases by improving R-tree layer map search. The envelope for each layer is now remembered so that it does not have to be reconstructed for each call.
## 2022-10-07
### Added
- Pageserver: Added initial support for online tenant relocation.
- Pageserver: Added support for multiple PostgreSQL versions.
- Compute: Added support for the `h3_pg` and `plv8` PostgreSQL extensions. For information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Compute: Added support for a future implementation of sequential scan prefetch, which improves I/O performance for operations such as table scans.
### Changed
- Pageserver: Increased the default `compaction_period` setting to 20 seconds to reduce the frequency of polling that is performed to determine if compaction is required. The frequency of polling with the previous setting of 1 could result in excessive CPU consumption when there are numerous tenants and projects.
- Compute: Moved the backpressure throttling algorithm to the Neon extension to minimize changes to the Neon PostgreSQL core code, and added a `backpressure_throttling_time` function that returns the total time spent throttling since the system was started.
- Proxy: Improved error messages and logging.
## 2022-09-01
### Added
- Compute: Added support for the `PostGIS` extension, version 3.3.0. For information about PostgreSQL extensions supported by Neon, see [PostgreSQL extensions](https://neon.tech/docs/extensions/pg-extensions).
- Proxy: Added support for forwarding the `options`, `application_name`, and `replication` connection parameters to compute nodes.
### Changed
- Compute: Updated the PostgreSQL version to 14.5.
## 2022-08-02
### Added
- Compute: Installed the `uuid-ossp` extension binaries, which provide functions for generating universally unique identifiers (UUIDs). `CREATE EXTENSION "uuid-ossp"` is now supported. For information about extensions supported by Neon, see [Available PostgreSQL extensions]https://neon.tech/docs/extensions/pg-extensions).
- Compute: Added logging for compute node initialization failure during the 'basebackup' stage.
- Pageserver: Added reporting of the physical size with the tenant status, in the internal management API.
### Changed
- Pageserver: Merged the 'wal_receiver' endpoint with 'timeline_detail', in the internal management API.
### Fixed
- Pageserver: Avoided busy looping when deletion from cloud storage is skipped due to failed upload tasks.
## 2022-07-19
### Added
- Safekeeper: Added support for backing up Write-Ahead Logs (WAL) to S3 storage for disaster recovery.
- Safekeeper: Added support for downloading WAL from S3 storage on demand.
- Proxy: Added support for propagating SASL/SCRAM PostgreSQL authentication errors to clients.
- Pageserver: Implemented a page service `fullbackup` endpoint that works like basebackup but also sends relational files.
- Pageserver: Added support for importing a base backup taken from a standalone PostgreSQL instance or another Pageserver using `psql` copy.
- Compute: Enabled the use of the `CREATE EXTENSION` statement for users that are not database owners.
### Changed
- Safekeeper: Switched to [etcd](https://etcd.io/) subscriptions to keep Pageservers up to date with the Safekeeper status.
- Safekeeper: Implemented JSON Web Token (JWT) authentication in the Safekeeper HTTP API.
- Compute: Updated the PostgreSQL version to 14.4.
- Compute: Renamed the following custom configuration parameters:
- `zenith.page_server_connstring` to `neon.pageserver_connstring`
- `zenith.zenith_tenant` to `neon.tenant_id`
- `zenith.zenith_timeline` to `neon.timeline_id`
- `zenith.max_cluster_size` to `neon.max_cluster_size`
- `wal_acceptors` to `safekeepers`
- Control Plane: Renamed `zenith_admin` role to `cloud_admin`.
- Pageserver: Updated the timeline size reported when `DROP DATABASE` is executed.
- Pageserver: Switched to per-tenant attach/detach. Download operations of all timelines for one tenant are now grouped together so that branches can be used safely with attach/detach.
- Pageserver: Decreased the number of threads by running gc and compaction in a blocking tokio thread pool.
- Compute: Enabled the use of the `CREATE EXTENSION` statement for users that are not database owners.
### Fixed
- Pageserver: Fixed the database size calculation to count Visibility Maps (VMs) and Free Space Maps (FSMs) in addition to the main fork of the relation.
- Safekeeper: Fixed the walreceiver connection selection mechanism:
- Reconnecting to a Safekeeper immediately after it fails is now avoided by limiting candidates to those with the fewest connection attempts.
- Increased the `max_lsn_wal_lag` default setting to avoid constant reconnections during normal work.
- Fixed `wal_connection_attempts` maintenance, preventing busy reconnection loops.

View File

@@ -18,29 +18,7 @@ use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
/// ```mermaid
/// stateDiagram-v2
///
/// [*] --> Loading: spawn_load()
/// [*] --> Attaching: spawn_attach()
///
/// Loading --> Activating: activate()
/// Attaching --> Activating: activate()
/// Activating --> Active: infallible
///
/// Loading --> Broken: load() failure
/// Attaching --> Broken: attach() failure
///
/// Active --> Stopping: set_stopping(), part of shutdown & detach
/// Stopping --> Broken: late error in remove_tenant_from_memory
///
/// Broken --> [*]: ignore / detach / shutdown
/// Stopping --> [*]: remove_from_memory complete
///
/// Active --> Broken: cfg(testing)-only tenant break point
/// ```
/// A state of a tenant in pageserver's memory.
#[derive(
Clone,
PartialEq,
@@ -48,63 +26,40 @@ use bytes::{BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]
#[serde(tag = "slug", content = "data")]
pub enum TenantState {
/// This tenant is being loaded from local disk.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
/// This tenant is being loaded from local disk
Loading,
/// This tenant is being attached to the pageserver.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
/// This tenant is being downloaded from cloud storage.
Attaching,
/// The tenant is transitioning from Loading/Attaching to Active.
///
/// While in this state, the individual timelines are being activated.
///
/// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass.
Activating(ActivatingFrom),
/// The tenant has finished activating and is open for business.
///
/// Transitions out of this state are possible through `set_stopping()` and `set_broken()`.
/// Tenant is fully operational
Active,
/// The tenant is recognized by pageserver, but it is being detached or the
/// A tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping,
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
///
/// If the tenant fails to load or attach, it will transition to this state
/// and it is guaranteed that no background tasks are running in its name.
///
/// The other way to transition into this state is from `Stopping` state
/// through `set_broken()` called from `remove_tenant_from_memory()`. That happens
/// if the cleanup future executed by `remove_tenant_from_memory()` fails.
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken { reason: String, backtrace: String },
}
impl TenantState {
pub fn attachment_status(&self) -> TenantAttachmentStatus {
use TenantAttachmentStatus::*;
// Below TenantState::Activating is used as "transient" or "transparent" state for
// attachment_status determining.
match self {
// The attach procedure writes the marker file before adding the Attaching tenant to the tenants map.
// So, technically, we can return Attached here.
// However, as soon as Console observes Attached, it will proceed with the Postgres-level health check.
// But, our attach task might still be fetching the remote timelines, etc.
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
Self::Attaching => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
Self::Loading => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.
Self::Active => Attached,
@@ -143,15 +98,6 @@ impl std::fmt::Debug for TenantState {
}
}
/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
/// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`]
Loading,
/// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`]
Attaching,
}
/// A state of a timeline in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TimelineState {
@@ -883,55 +829,4 @@ mod tests {
err
);
}
#[test]
fn tenantstatus_activating_serde() {
let states = [
TenantState::Activating(ActivatingFrom::Loading),
TenantState::Activating(ActivatingFrom::Attaching),
];
let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]";
let actual = serde_json::to_string(&states).unwrap();
assert_eq!(actual, expected);
let parsed = serde_json::from_str::<Vec<TenantState>>(&actual).unwrap();
assert_eq!(states.as_slice(), &parsed);
}
#[test]
fn tenantstatus_activating_strum() {
// tests added, because we use these for metrics
let examples = [
(line!(), TenantState::Loading, "Loading"),
(line!(), TenantState::Attaching, "Attaching"),
(
line!(),
TenantState::Activating(ActivatingFrom::Loading),
"Activating",
),
(
line!(),
TenantState::Activating(ActivatingFrom::Attaching),
"Activating",
),
(line!(), TenantState::Active, "Active"),
(line!(), TenantState::Stopping, "Stopping"),
(
line!(),
TenantState::Broken {
reason: "Example".into(),
backtrace: "Looooong backtrace".into(),
},
"Broken",
),
];
for (line, rendered, expected) in examples {
let actual: &'static str = rendered.into();
assert_eq!(actual, expected, "example on {line}");
}
}
}

View File

@@ -1,33 +0,0 @@
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
if let Some(b) = barrier {
b.wait().await
}
}
}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
}

View File

@@ -60,9 +60,6 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.
pub mod completion;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")

View File

@@ -335,34 +335,13 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// All tenant load operations carry this while they are ongoing; it will be dropped once those
// operations finish either successfully or in some other manner. However, the initial load
// will be then done, and we can start the global background tasks.
let (init_done_tx, init_done_rx) = utils::completion::channel();
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
))?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
init_done_rx.wait().await;
let elapsed = init_started_at.elapsed();
tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed."
);
}
});
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -374,7 +353,6 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
init_done_rx.clone(),
)?;
}
@@ -412,7 +390,6 @@ fn start_pageserver(
);
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let init_done_rx = init_done_rx;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
@@ -428,13 +405,6 @@ fn start_pageserver(
"consumption metrics collection",
true,
async move {
// first wait for initial load to complete before first iteration.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
init_done_rx.wait().await;
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,

View File

@@ -54,13 +54,12 @@ use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
tenant::{self, storage_layer::PersistentLayer, Timeline},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -83,7 +82,6 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -100,9 +98,6 @@ pub fn launch_disk_usage_global_eviction_task(
"disk usage based eviction",
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
init_done.wait().await;
disk_usage_eviction_task(
&state,
task_config,
@@ -329,7 +324,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
@@ -434,7 +429,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<RemoteLayerDesc>,
layer: Arc<dyn PersistentLayer>,
last_activity_ts: SystemTime,
}

View File

@@ -859,7 +859,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned()).await;
tenant.set_broken("broken from test".to_owned());
json_response(StatusCode::OK, ())
}

View File

@@ -45,7 +45,6 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument]
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -20,7 +20,6 @@ use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use std::cmp::min;
@@ -84,7 +83,6 @@ pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
pub mod layer_cache;
pub mod metadata;
mod par_fsync;
@@ -449,11 +447,6 @@ pub enum DeleteTimelineError {
Other(#[from] anyhow::Error),
}
pub enum SetStoppingError {
AlreadyStopping,
Broken,
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -652,17 +645,16 @@ impl Tenant {
"attach tenant",
false,
async move {
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
}
let doit = async {
tenant_clone.attach(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
Ok(_) => {}
Err(e) => {
error!("attach failed, setting tenant state to Broken: {:?}", e);
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(e.to_string());
});
tenant_clone.set_broken(e.to_string());
error!("error attaching tenant: {:?}", e);
}
}
Ok(())
@@ -679,8 +671,6 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -891,17 +881,14 @@ impl Tenant {
/// If the loading fails for some reason, the Tenant will go into Broken
/// state.
///
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
#[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))]
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> Arc<Tenant> {
debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, tenant_id) {
Ok(conf) => conf,
Err(e) => {
@@ -933,27 +920,20 @@ impl Tenant {
"initial tenant load",
false,
async move {
// keep the sender alive as long as we have the initial load ongoing; it will be
// None for loads spawned after init_tenant_mgr.
let (_tx, rx) = if let Some((tx, rx)) = init_done {
(Some(tx), Some(rx))
} else {
(None, None)
let doit = async {
tenant_clone.load(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match tenant_clone.load(&ctx).await {
Ok(()) => {
debug!("load finished, activating");
tenant_clone.activate(broker_client, rx.as_ref(), &ctx);
}
match doit.await {
Ok(()) => {}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(err.to_string());
});
tenant_clone.set_broken(err.to_string());
error!("could not load tenant {tenant_id}: {err:?}");
}
}
Ok(())
info!("initial load for tenant {tenant_id} finished!");
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
@@ -962,6 +942,8 @@ impl Tenant {
}),
);
info!("spawned load into background");
tenant
}
@@ -969,11 +951,10 @@ impl Tenant {
/// Background task to load in-memory data structures for this tenant, from
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
debug!("loading tenant task");
info!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
@@ -983,109 +964,102 @@ impl Tenant {
//
// Scan the directory, peek into the metadata file of each timeline, and
// collect a list of timelines and their ancestors.
let tenant_id = self.tenant_id;
let conf = self.conf;
let span = info_span!("blocking");
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
for entry in std::fs::read_dir(&timelines_dir).with_context(|| {
format!(
"Failed to list timelines directory for tenant {}",
self.tenant_id
)
})? {
let entry = entry.with_context(|| {
format!("cannot read timeline dir entry for {}", self.tenant_id)
})?;
let timeline_dir = entry.path();
let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || {
let _g = span.entered();
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = conf.timelines_path(&tenant_id);
for entry in
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
{
let entry = entry.context("read timeline dir entry")?;
let timeline_dir = entry.path();
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_uninit_mark_file.display()
)
})?;
let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id);
})?;
let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file = self
.conf
.timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
"Found an uninit mark file for timeline {}/{}, removing the timeline and its uninit mark",
self.tenant_id, timeline_id
);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file =
conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
%timeline_id,
"Found an uninit mark file, removing the timeline and its uninit mark",
);
if let Err(e) = remove_timeline_and_uninit_mark(
&timeline_dir,
&timeline_uninit_mark_file,
) {
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(conf, timeline_id, tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
}
}
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load)
})
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
let sorted_timelines = tree_sort_timelines(timelines_to_load)?;
// FIXME original collect_timeline_files contained one more check:
// 1. "Timeline has no ancestor and no layer files"
@@ -1095,7 +1069,7 @@ impl Tenant {
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
trace!("Done");
info!("Done");
Ok(())
}
@@ -1686,193 +1660,127 @@ impl Tenant {
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut activating = false;
let mut result = Ok(());
self.state.send_modify(|current_state| {
use pageserver_api::models::ActivatingFrom;
match &*current_state {
TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => {
panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
}
TenantState::Loading => {
*current_state = TenantState::Activating(ActivatingFrom::Loading);
}
TenantState::Attaching => {
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
}
}
debug!(tenant_id = %self.tenant_id, "Activating tenant");
activating = true;
// Continue outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
});
if activating {
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, init_done);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), ctx);
activated_timelines += 1;
}
self.state.send_modify(move |current_state| {
assert!(
matches!(current_state, TenantState::Activating(_)),
"set_stopping and set_broken wait for us to leave Activating state",
);
*current_state = TenantState::Active;
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
});
}
}
/// Change tenant status to Stopping, to mark that it is being shut down.
///
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
pub async fn set_stopping(&self) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
*current_state = TenantState::Stopping;
// Continue stopping outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
true
}
TenantState::Broken { reason, .. } => {
info!(
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
);
err = Some(SetStoppingError::Broken);
false
}
TenantState::Stopping => {
info!("Tenant is already in Stopping state");
err = Some(SetStoppingError::AlreadyStopping);
false
}
});
match (stopping, err) {
(true, None) => {} // continue
(false, Some(err)) => return Err(err),
(true, Some(_)) => unreachable!(
"send_if_modified closure must error out if not transitioning to Stopping"
),
(false, None) => unreachable!(
"send_if_modified closure must return true if transitioning to Stopping"
),
}
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
Ok(())
}
/// Method for tenant::mgr to transition us into Broken state in case of a late failure in
/// `remove_tenant_from_memory`
///
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// In tests, we also use this to set tenants to Broken state on purpose.
pub(crate) async fn set_broken(&self, reason: String) {
let mut rx = self.state.subscribe();
// The load & attach routines own the tenant state until it has reached `Active`.
// So, wait until it's done.
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true,
})
.await
.expect("cannot drop self.state while on a &self method");
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
if cfg!(feature = "testing") {
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
} else {
unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
// activate() was called on an already Active tenant. Shouldn't happen.
result = Err(anyhow::anyhow!("Tenant is already active"));
}
TenantState::Broken { reason, .. } => {
// This shouldn't happen either
result = Err(anyhow::anyhow!(
"Could not activate tenant because it is in broken state due to: {reason}",
));
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state, skipping activation");
}
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
debug!(tenant_id = %self.tenant_id, "Activating tenant");
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), ctx);
activated_timelines += 1;
}
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
tenant_id = %self.tenant_id,
activated_timelines,
total_timelines,
post_state = <&'static str>::from(&*current_state),
"activation attempt finished"
);
}
}
});
result
}
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
match current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Stopping;
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
}
}
TenantState::Broken { reason, .. } => {
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
}
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Stopping state");
}
}
});
}
pub fn set_broken(&self, reason: String) {
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Active => {
// Broken tenants can currently only used for fatal errors that happen
// while loading or attaching a tenant. A tenant that has already been
// activated should never be marked as broken. We cope with it the best
// we can, but it shouldn't happen.
warn!("Changing Active tenant to Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
}
TenantState::Broken { .. } => {
// This shouldn't happen either
warn!("Tenant is already in Broken state");
}
// This is the only "expected" path, any other path is a bug.
TenantState::Stopping => {
// This shouldn't happen either
warn!(
"Marking Stopping tenant as Broken state, reason: {}",
reason
);
*current_state = TenantState::broken_from_reason(reason);
}
}
TenantState::Loading | TenantState::Attaching => {
info!("Setting tenant as Broken state, reason: {}", reason);
*current_state = TenantState::broken_from_reason(reason);
}
}
});
}
@@ -1885,7 +1793,7 @@ impl Tenant {
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
TenantState::Loading | TenantState::Attaching => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {

View File

@@ -1,34 +0,0 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
pub struct LayerCache {
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
}
impl LayerCache {
pub fn new() -> Self {
Self {
layers: Mutex::new(HashMap::new()),
}
}
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
self.layers.lock().unwrap();
guard.get(layer_fname).cloned()
}
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
let guard = self.layers.lock().unwrap();
guard.contains_key(layer_fname)
}
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
let mut guard = self.layers.lock().unwrap();
guard.insert(layer_fname, persistent_layer);
}
}

View File

@@ -61,7 +61,6 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayer;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -139,19 +138,24 @@ where
self.layer_map.remove_historic_noflush(layer)
}
/// Ensure the downloaded layer matches existing layer.
/// Replaces existing layer iff it is the `expected`.
///
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
pub fn ensure_consistent(
&self,
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.ensure_consistent_noflush(expected, new)
self.layer_map.replace_historic_noflush(expected, new)
}
// We will flush on drop anyway, but this method makes it
@@ -305,16 +309,16 @@ where
}
}
pub(self) fn ensure_consistent_noflush(
&self,
pub(self) fn replace_historic_noflush(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
@@ -341,7 +345,17 @@ where
None
};
Ok(Replacement::Replaced { in_buffered: false })
let replaced = self.historic.replace(&key, new.clone(), |existing| {
Self::compare_arced_layers(existing, expected)
});
if let Replacement::Replaced { .. } = &replaced {
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
}
}
Ok(replaced)
}
/// Helper function for BatchedUpdates::drop.

View File

@@ -10,7 +10,6 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -20,12 +19,9 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
};
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -67,7 +63,6 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -124,7 +119,6 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
&ctx,
) {
Ok(tenant) => {
@@ -160,7 +154,6 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -214,14 +207,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
broker_client,
remote_storage,
init_done,
ctx,
)
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
};
Ok(tenant)
}
@@ -236,7 +222,6 @@ pub fn schedule_local_tenant_processing(
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument]
pub async fn shutdown_all_tenants() {
// Prevent new tenants from being created.
let tenants_to_shut_down = {
@@ -259,65 +244,15 @@ pub async fn shutdown_all_tenants() {
}
};
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for other tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (tenant_id, tenant) in tenants_to_shut_down {
join_set.spawn(
async move {
match tenant.set_stopping().await {
Ok(()) => debug!("tenant successfully stopped"),
Err(SetStoppingError::Broken) => {
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well");
},
Err(SetStoppingError::AlreadyStopping) => {
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
}
}
tenant
}
.instrument(info_span!("set_stopping", %tenant_id)),
);
}
let mut panicked = 0;
while let Some(res) = join_set.join_next().await {
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
Ok(tenant) => tenants_to_freeze_and_flush.push(tenant),
for (_, tenant) in tenants_to_shut_down {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_stopping();
tenants_to_freeze_and_flush.push(tenant);
}
}
if panicked > 0 {
warn!(panicked, "observed panicks while stopping tenants");
}
// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
@@ -329,30 +264,12 @@ pub async fn shutdown_all_tenants() {
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
let mut join_set = tokio::task::JoinSet::new();
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
join_set.spawn(
async move {
if let Err(err) = tenant.freeze_and_flush().await {
warn!("Could not checkpoint tenant during shutdown: {err:?}");
}
}
.instrument(info_span!("freeze_and_flush", %tenant_id)),
);
}
while let Some(next) = join_set.join_next().await {
match next {
Ok(()) => {}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("no cancelling")
}
Err(join_error) if join_error.is_panic() => { /* reported already */ }
Err(join_error) => warn!("unknown kind of JoinError: {join_error}"),
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}
}
@@ -374,7 +291,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -520,7 +437,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -593,7 +510,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -672,23 +589,13 @@ where
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure
}
Err(SetStoppingError::Broken) => {
// continue the procedure, let's hope the closure can deal with broken tenants
}
Err(SetStoppingError::AlreadyStopping) => {
// the tenant is already stopping or broken, don't do anything
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
Some(tenant) => match tenant.current_state() {
TenantState::Attaching
| TenantState::Loading
| TenantState::Broken { .. }
| TenantState::Active => tenant.set_stopping(),
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
},
None => return Err(TenantStateError::NotFound(tenant_id)),
}
}
@@ -713,7 +620,7 @@ where
let tenants_accessor = TENANTS.read().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
tenant.set_broken(e.to_string()).await;
tenant.set_broken(e.to_string());
}
None => {
warn!("Tenant {tenant_id} got removed from memory");

View File

@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayerDesc;
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
@@ -431,6 +431,14 @@ pub trait PersistentLayer: Layer {
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
/// Returns None if the layer file size is not known.
///
/// Should not change over the lifetime of the layer object because
@@ -442,6 +450,16 @@ pub trait PersistentLayer: Layer {
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a

View File

@@ -30,7 +30,6 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -58,7 +57,7 @@ use utils::{
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf, RemoteLayerDesc,
LayerKeyIter, PathOrConf,
};
///
@@ -664,17 +663,6 @@ impl DeltaLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new delta layer.

View File

@@ -26,7 +26,6 @@ use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -54,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -465,17 +464,6 @@ impl ImageLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new image layer.

View File

@@ -1,4 +1,4 @@
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
//! in remote storage.
//!
use crate::config::PageServerConf;
@@ -25,19 +25,19 @@ use super::{
LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`crate::storage_layer::DeltaLayer`].
///
/// RemoteLayerDesc might be downloaded on-demand during operations which are
/// RemoteLayer might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`.
///
/// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayerDesc {
pub(crate) tenantid: TenantId,
pub(crate) timelineid: TimelineId,
pub(crate) key_range: Range<Key>,
pub(crate) lsn_range: Range<Lsn>,
pub struct RemoteLayer {
tenantid: TenantId,
timelineid: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub file_name: LayerFileName,
@@ -54,7 +54,7 @@ pub struct RemoteLayerDesc {
/// Has `LayerMap::replace` failed for this (true) or not (false).
///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed.
///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
@@ -63,9 +63,9 @@ pub struct RemoteLayerDesc {
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for RemoteLayerDesc {
impl std::fmt::Debug for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayerDesc")
f.debug_struct("RemoteLayer")
.field("file_name", &self.file_name)
.field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.is_incremental)
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayerDesc {
}
}
impl Layer for RemoteLayerDesc {
impl Layer for RemoteLayer {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
@@ -119,7 +119,7 @@ impl Layer for RemoteLayerDesc {
}
}
impl PersistentLayer for RemoteLayerDesc {
impl PersistentLayer for RemoteLayer {
fn get_tenant_id(&self) -> TenantId {
self.tenantid
}
@@ -160,6 +160,14 @@ impl PersistentLayer for RemoteLayerDesc {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn file_size(&self) -> u64 {
self.layer_metadata.file_size()
}
@@ -193,15 +201,15 @@ impl PersistentLayer for RemoteLayerDesc {
}
}
impl RemoteLayerDesc {
impl RemoteLayer {
pub fn new_img(
tenantid: TenantId,
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),
@@ -222,8 +230,8 @@ impl RemoteLayerDesc {
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),

View File

@@ -12,9 +12,8 @@ use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
pub fn start_background_loops(tenant: &Arc<Tenant>) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -25,9 +24,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
compaction_loop(tenant)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
@@ -44,9 +41,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
gc_loop(tenant)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;

View File

@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayerDesc,
LayerAccessStats, LayerFileName, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -77,7 +77,6 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_cache::LayerCache;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -120,7 +119,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -242,8 +241,6 @@ pub struct Timeline {
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
layer_cache: Arc<LayerCache>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -1010,22 +1007,20 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
if self.layer_cache.contains(&remote_layer_desc.filename()) {
return Ok(Some(false));
}
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
}
self.download_remote_layer(remote_layer_desc).await?;
self.download_remote_layer(remote_layer).await?;
Ok(Some(true))
}
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1052,7 +1047,7 @@ impl Timeline {
pub async fn evict_layers(
&self,
_: &GenericRemoteStorage,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
let remote_client = self.remote_client.clone().expect(
@@ -1087,7 +1082,7 @@ impl Timeline {
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
// ensure that the layers have finished uploading
@@ -1136,12 +1131,12 @@ impl Timeline {
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<RemoteLayerDesc>,
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if !self.layer_cache.contains(&local_layer.filename()) {
if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later
return Ok(false);
@@ -1168,7 +1163,7 @@ impl Timeline {
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
@@ -1177,7 +1172,7 @@ impl Timeline {
.access_stats()
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
@@ -1188,7 +1183,6 @@ impl Timeline {
),
});
/*
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() {
@@ -1239,10 +1233,8 @@ impl Timeline {
false
}
};
*/
// Ok(replaced)
Ok(true)
Ok(replaced)
}
}
@@ -1427,8 +1419,6 @@ impl Timeline {
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
layer_cache: Arc::new(LayerCache::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1575,12 +1565,9 @@ impl Timeline {
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
let remote_desc = layer.layer_desc();
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1612,9 +1599,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
let remote_desc = layer.layer_desc();
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1659,9 +1644,9 @@ impl Timeline {
async fn create_remote_layers(
&self,
index_part: &IndexPart,
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
up_to_date_disk_consistent_lsn: Lsn,
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
// Are we missing some files that are present in remote storage?
// Create RemoteLayer instances for them.
let mut local_only_layers = local_layers;
@@ -1740,7 +1725,7 @@ impl Timeline {
continue;
}
let remote_layer = RemoteLayerDesc::new_img(
let remote_layer = RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
imgfilename,
@@ -1768,7 +1753,7 @@ impl Timeline {
);
continue;
}
let remote_layer = RemoteLayerDesc::new_delta(
let remote_layer = RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
deltafilename,
@@ -1925,7 +1910,6 @@ impl Timeline {
// no cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
@@ -2174,7 +2158,7 @@ impl Timeline {
}
}
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
@@ -2191,10 +2175,10 @@ impl Timeline {
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<RemoteLayerDesc>,
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
if self.layer_cache.contains(&layer.filename()) {
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
self.metrics
@@ -2443,7 +2427,13 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry.
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer)
{
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
remote_layer // download happens outside the scope of `layers` guard object
} else {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -2466,10 +2456,6 @@ impl Timeline {
}),
));
continue 'outer;
} else {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
layer // download happens outside the scope of `layers` guard object
}
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
@@ -2741,7 +2727,7 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
#[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
async fn flush_frozen_layer(
self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>,
@@ -2765,14 +2751,9 @@ impl Timeline {
// normal case, write out a L0 delta layer file.
let this = self.clone();
let frozen_layer = frozen_layer.clone();
let span = tracing::info_span!("blocking");
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.create_delta_layer(&frozen_layer)
})
.await
.context("create_delta_layer spawn_blocking")
.and_then(|res| res)?;
let (delta_path, metadata) =
tokio::task::spawn_blocking(move || this.create_delta_layer(&frozen_layer))
.await??;
HashMap::from([(delta_path, metadata)])
};
@@ -2909,8 +2890,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(Arc::new(l.layer_desc()));
self.layer_cache.insert(l.filename(), l);
batch_updates.insert_historic(l);
batch_updates.flush();
// update the timeline's physical size
@@ -3156,9 +3136,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(l.layer_desc()));
let x: Arc<dyn PersistentLayer> = l;
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(l);
}
updates.flush();
drop(layers);
@@ -3171,7 +3149,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
}
/// Top-level failure to compact.
@@ -3181,7 +3159,7 @@ enum CompactionError {
///
/// This should not happen repeatedly, but will be retried once by top-level
/// `Timeline::compact`.
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
DownloadRequired(Vec<Arc<RemoteLayer>>),
/// Compaction cannot be done right now; page reconstruction and so on.
Other(anyhow::Error),
}
@@ -3253,9 +3231,13 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.cloned()
.map(|l| {
l.clone()
.downcast_remote_layer()
.expect("just checked it is remote layer")
})
.collect::<Vec<_>>();
if !remotes.is_empty() {
@@ -3540,18 +3522,14 @@ impl Timeline {
let this = self.clone();
let ctx_inner = ctx.clone();
let layer_removal_cs_inner = layer_removal_cs.clone();
let span = tracing::info_span!("blocking");
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
})
.await
.context("compact_level0_phase1 spawn_blocking")
.map_err(CompactionError::Other)
.and_then(|res| res)?;
.unwrap()?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
@@ -3595,15 +3573,13 @@ impl Timeline {
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let remote_desc = l.layer_desc();
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(remote_desc));
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -4076,7 +4052,7 @@ impl Timeline {
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayerDesc>,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4133,12 +4109,10 @@ impl Timeline {
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
/*
let failure = match updates.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => {
@@ -4193,9 +4167,8 @@ impl Timeline {
remote_layer
.download_replacement_failure
.store(true, Relaxed);
} */
}
}
updates.flush();
drop(layers);
@@ -4208,10 +4181,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download
@@ -4225,8 +4195,7 @@ impl Timeline {
drop(permit);
Ok(())
}
.in_current_span(),
}.in_current_span(),
);
receiver.await.context("download task cancelled")?
@@ -4299,7 +4268,7 @@ impl Timeline {
let layers = self.layers.read().unwrap();
layers
.iter_historic_layers()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl))
}
@@ -4374,7 +4343,7 @@ pub struct DiskUsageEvictionInfo {
}
pub struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<RemoteLayerDesc>,
pub layer: Arc<dyn PersistentLayer>,
pub last_activity_ts: SystemTime,
}
@@ -4408,7 +4377,7 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
if !self.layer_cache.contains(&l.filename()) {
if l.is_remote_layer() {
continue;
}

View File

@@ -29,7 +29,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::{PersistentLayer, RemoteLayerDesc},
storage_layer::PersistentLayer,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -184,11 +184,11 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<RemoteLayerDesc>> = {
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if !self.layer_cache.contains(&hist_layer.filename()) {
if hist_layer.is_remote_layer() {
continue;
}

View File

@@ -19,10 +19,8 @@ use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use crate::metrics::BROKER_ITERATION_TIMELINES;
use crate::metrics::BROKER_PULLED_UPDATES;
use crate::metrics::BROKER_PUSHED_UPDATES;
use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -63,14 +61,8 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
BROKER_PUSHED_UPDATES.inc();
}
let elapsed = now.elapsed();
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64);
if elapsed > push_interval / 2 {
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
}
// Log duration every second. Should be about 10MB of logs per day.
info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
sleep(push_interval).await;
}
};

View File

@@ -125,25 +125,6 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_backup_errors_total counter")
});
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",
"Seconds to push all timeline updates to the broker",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
});
pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[
1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
];
pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_iteration_timelines",
"Count of timelines pushed to the broker in a single iteration",
TIMELINES_COUNT_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
});
pub const LABEL_UNKNOWN: &str = "unknown";

View File

@@ -156,9 +156,7 @@ class LLVM:
profdata: Path,
objects: List[str],
sources: List[str],
demangler: Optional[Path] = None,
output_file: Optional[Path] = None,
) -> None:
demangler: Optional[Path] = None) -> None:
cwd = self.cargo.cwd
objects = list(intersperse('-object', objects))
@@ -182,18 +180,14 @@ class LLVM:
*objects,
*sources,
]
if output_file is not None:
with output_file.open('w') as outfile:
subprocess.check_call(cmd, cwd=cwd, stdout=outfile)
else:
subprocess.check_call(cmd, cwd=cwd)
subprocess.check_call(cmd, cwd=cwd)
def cov_report(self, **kwargs) -> None:
self._cov(subcommand='report', **kwargs)
def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None:
def cov_export(self, *, kind: str, **kwargs) -> None:
extras = (f'-format={kind}', )
self._cov(subcommand='export', *extras, output_file=output_file, **kwargs)
self._cov(subcommand='export', *extras, **kwargs)
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
extras = [f'-format={kind}']
@@ -289,12 +283,9 @@ class TextReport(Report):
self.llvm.cov_show(kind='text', **self._common_kwargs())
@dataclass
class LcovReport(Report):
output_file: Path
def generate(self) -> None:
self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs())
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
@dataclass
@@ -484,7 +475,7 @@ class State:
'text':
lambda: TextReport(**params),
'lcov':
lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'),
lambda: LcovReport(**params),
'summary':
lambda: SummaryReport(**params),
'github':

View File

@@ -1621,8 +1621,6 @@ class NeonPageserver(PgProtocol):
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
]
def start(

View File

@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*could not load tenant.*load local timeline.*",
]
)

View File

@@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore(
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*attach failed.*: storage-sync-list-remote-timelines",
".*error attaching tenant: storage-sync-list-remote-timelines",
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,

View File

@@ -647,9 +647,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)

View File

@@ -22,7 +22,6 @@ from fixtures.neon_fixtures import (
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -309,7 +308,9 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
env.pageserver.allowed_errors.append(
".*could not load tenant.*Failed to list timelines directory.*"
)
client = env.pageserver.http_client()
@@ -340,16 +341,10 @@ def test_pageserver_with_empty_tenants(
env.pageserver.start()
client = env.pageserver.http_client()
def not_loading():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
wait_until(10, 0.2, not_loading)
tenants = client.tenant_list()
assert len(tenants) == 2
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
broken_tenant["state"]["slug"] == "Broken"
@@ -360,7 +355,7 @@ def test_pageserver_with_empty_tenants(
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert (