mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-23 13:20:37 +00:00
Compare commits
14 Commits
release-pr
...
jemalloc-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d2709f4a1 | ||
|
|
6e2c04bc48 | ||
|
|
76ae735a24 | ||
|
|
7be445f627 | ||
|
|
35e9fb360b | ||
|
|
0d21187322 | ||
|
|
e8a98adcd0 | ||
|
|
98be8b9430 | ||
|
|
6eb946e2de | ||
|
|
681a04d287 | ||
|
|
3df67bf4d7 | ||
|
|
0d8e68003a | ||
|
|
637ad4a638 | ||
|
|
8d0f701767 |
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -735,7 +735,7 @@ jobs:
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
- uses: docker/setup-buildx-action@v2
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
@@ -792,7 +792,7 @@ jobs:
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
- uses: docker/setup-buildx-action@v2
|
||||
with:
|
||||
# Disable parallelism for docker buildkit.
|
||||
# As we already build everything with `make -j$(nproc)`, running it in additional level of parallelisam blows up the Runner.
|
||||
@@ -865,7 +865,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.23.2
|
||||
VM_BUILDER_VERSION: v0.28.1
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
32
Cargo.lock
generated
32
Cargo.lock
generated
@@ -599,7 +599,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
@@ -2519,7 +2519,7 @@ dependencies = [
|
||||
"http 0.2.9",
|
||||
"hyper 0.14.26",
|
||||
"log",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"rustls-native-certs 0.6.2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.0",
|
||||
@@ -4059,7 +4059,7 @@ dependencies = [
|
||||
"futures",
|
||||
"once_cell",
|
||||
"pq_proto",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -4350,7 +4350,7 @@ dependencies = [
|
||||
"routerify",
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
@@ -4542,7 +4542,7 @@ dependencies = [
|
||||
"itoa",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"rustls-pki-types",
|
||||
@@ -4696,7 +4696,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -4956,9 +4956,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.21.9"
|
||||
version = "0.21.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
|
||||
checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.6",
|
||||
@@ -4968,9 +4968,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.22.2"
|
||||
version = "0.22.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
|
||||
checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.6",
|
||||
@@ -5282,7 +5282,7 @@ checksum = "2e95efd0cefa32028cdb9766c96de71d96671072f9fb494dc9fb84c0ef93e52b"
|
||||
dependencies = [
|
||||
"httpdate",
|
||||
"reqwest",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"sentry-backtrace",
|
||||
"sentry-contexts",
|
||||
"sentry-core",
|
||||
@@ -6193,7 +6193,7 @@ checksum = "0ea13f22eda7127c827983bdaf0d7fff9df21c8817bab02815ac277a21143677"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"ring 0.17.6",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.25.0",
|
||||
@@ -6206,7 +6206,7 @@ version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5"
|
||||
dependencies = [
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -6216,7 +6216,7 @@ version = "0.25.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
|
||||
dependencies = [
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
]
|
||||
@@ -6677,7 +6677,7 @@ dependencies = [
|
||||
"base64 0.21.1",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"rustls-webpki 0.100.2",
|
||||
"url",
|
||||
"webpki-roots 0.23.1",
|
||||
@@ -7354,7 +7354,7 @@ dependencies = [
|
||||
"regex-automata 0.4.3",
|
||||
"regex-syntax 0.8.2",
|
||||
"reqwest",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -252,7 +252,7 @@ debug = true
|
||||
|
||||
# disable debug symbols for all packages except this one to decrease binaries size
|
||||
[profile.release.package."*"]
|
||||
debug = false
|
||||
debug = true
|
||||
|
||||
[profile.release-line-debug]
|
||||
inherits = "release"
|
||||
|
||||
@@ -44,6 +44,7 @@ COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_i
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
ENV _RJEM_MALLOC_CONF="prof:true"
|
||||
# Show build caching stats to check if it was used in the end.
|
||||
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
|
||||
RUN set -e \
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use std::{collections::HashMap, str::FromStr, time::Duration};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use hyper::Method;
|
||||
use hyper::{Method, StatusCode};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
|
||||
TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
LocationConfigSecondary, ShardParameters, TenantConfig, TenantConfigRequest,
|
||||
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
},
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
@@ -120,6 +120,12 @@ enum Command {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
},
|
||||
/// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
|
||||
/// mode so that it can warm up content on a pageserver.
|
||||
TenantWarmup {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -581,6 +587,94 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
println!("{table}");
|
||||
}
|
||||
Command::TenantWarmup { tenant_id } => {
|
||||
let describe_response = storcon_client
|
||||
.dispatch::<(), TenantDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/tenant/{tenant_id}"),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
match describe_response {
|
||||
Ok(describe) => {
|
||||
if matches!(describe.policy, PlacementPolicy::Secondary) {
|
||||
// Fine: it's already known to controller in secondary mode: calling
|
||||
// again to put it into secondary mode won't cause problems.
|
||||
} else {
|
||||
anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
|
||||
}
|
||||
}
|
||||
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
|
||||
// Fine: this tenant isn't know to the storage controller yet.
|
||||
}
|
||||
Err(e) => {
|
||||
// Unexpected API error
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
vps_client
|
||||
.location_config(
|
||||
TenantShardId::unsharded(tenant_id),
|
||||
pageserver_api::models::LocationConfig {
|
||||
mode: pageserver_api::models::LocationConfigMode::Secondary,
|
||||
generation: None,
|
||||
secondary_conf: Some(LocationConfigSecondary { warm: true }),
|
||||
shard_number: 0,
|
||||
shard_count: 0,
|
||||
shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
},
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let describe_response = storcon_client
|
||||
.dispatch::<(), TenantDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/tenant/{tenant_id}"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let secondary_ps_id = describe_response
|
||||
.shards
|
||||
.first()
|
||||
.unwrap()
|
||||
.node_secondary
|
||||
.first()
|
||||
.unwrap();
|
||||
|
||||
println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
|
||||
loop {
|
||||
let (status, progress) = vps_client
|
||||
.tenant_secondary_download(
|
||||
TenantShardId::unsharded(tenant_id),
|
||||
Some(Duration::from_secs(10)),
|
||||
)
|
||||
.await?;
|
||||
println!(
|
||||
"Progress: {}/{} layers, {}/{} bytes",
|
||||
progress.layers_downloaded,
|
||||
progress.layers_total,
|
||||
progress.bytes_downloaded,
|
||||
progress.bytes_total
|
||||
);
|
||||
match status {
|
||||
StatusCode::OK => {
|
||||
println!("Download complete");
|
||||
break;
|
||||
}
|
||||
StatusCode::ACCEPTED => {
|
||||
// Loop
|
||||
}
|
||||
_ => {
|
||||
anyhow::bail!("Unexpected download status: {status}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
150
docs/storage_controller.md
Normal file
150
docs/storage_controller.md
Normal file
@@ -0,0 +1,150 @@
|
||||
# Storage Controller
|
||||
|
||||
## Concepts
|
||||
|
||||
The storage controller sits between administrative API clients and pageservers, and handles the details of mapping tenants to pageserver tenant shards. For example, creating a tenant is one API call to the storage controller,
|
||||
which is mapped into many API calls to many pageservers (for multiple shards, and for secondary locations).
|
||||
|
||||
It implements a pageserver-compatible API that may be used for CRUD operations on tenants and timelines, translating these requests into appropriate operations on the shards within a tenant, which may be on many different pageservers. Using this API, the storage controller may be used in the same way as the pageserver's administrative HTTP API, hiding
|
||||
the underlying details of how data is spread across multiple nodes.
|
||||
|
||||
The storage controller also manages generations, high availability (via secondary locations) and live migrations for tenants under its management. This is done with a reconciliation loop pattern, where tenants have an “intent” state and a “reconcile” task that tries to make the outside world match the intent.
|
||||
|
||||
## APIs
|
||||
|
||||
The storage controller’s HTTP server implements four logically separate APIs:
|
||||
|
||||
- `/v1/...` path is the pageserver-compatible API. This has to be at the path root because that’s where clients expect to find it on a pageserver.
|
||||
- `/control/v1/...` path is the storage controller’s API, which enables operations such as registering and management pageservers, or executing shard splits.
|
||||
- `/debug/v1/...` path contains endpoints which are either exclusively used in tests, or are for use by engineers when supporting a deployed system.
|
||||
- `/upcall/v1/...` path contains endpoints that are called by pageservers. This includes the `/re-attach` and `/validate` APIs used by pageservers
|
||||
to ensure data safety with generation numbers.
|
||||
|
||||
The API is authenticated with a JWT token, and tokens must have scope `pageserverapi` (i.e. the same scope as pageservers’ APIs).
|
||||
|
||||
See the `http.rs` file in the source for where the HTTP APIs are implemented.
|
||||
|
||||
## Database
|
||||
|
||||
The storage controller uses a postgres database to persist a subset of its state. Note that the storage controller does _not_ keep all its state in the database: this is a design choice to enable most operations to be done efficiently in memory, rather than having to read from the database. See `persistence.rs` for a more comprehensive comment explaining what we do and do not persist: a useful metaphor is that we persist objects like tenants and nodes, but we do not
|
||||
persist the _relationships_ between them: the attachment state of a tenant's shards to nodes is kept in memory and
|
||||
rebuilt on startup.
|
||||
|
||||
The file `[persistence.rs](http://persistence.rs)` contains all the code for accessing the database, and has a large doc comment that goes into more detail about exactly what we persist and why.
|
||||
|
||||
The `diesel` crate is used for defining models & migrations.
|
||||
|
||||
Running a local cluster with `cargo neon` automatically starts a vanilla postgress process to host the storage controller’s database.
|
||||
|
||||
### Diesel tip: migrations
|
||||
|
||||
If you need to modify the database schema, here’s how to create a migration:
|
||||
|
||||
- Install the diesel CLI with `cargo install diesel_cli`
|
||||
- Use `diesel migration generate <name>` to create a new migration
|
||||
- Populate the SQL files in the `migrations/` subdirectory
|
||||
- Use `DATABASE_URL=... diesel migration run` to apply the migration you just wrote: this will update the `[schema.rs](http://schema.rs)` file automatically.
|
||||
- This requires a running database: the easiest way to do that is to just run `cargo neon init ; cargo neon start`, which will leave a database available at `postgresql://localhost:1235/attachment_service`
|
||||
- Commit the migration files and the changes to schema.rs
|
||||
- If you need to iterate, you can rewind migrations with `diesel migration revert -a` and then `diesel migration run` again.
|
||||
- The migrations are build into the storage controller binary, and automatically run at startup after it is deployed, so once you’ve committed a migration no further steps are needed.
|
||||
|
||||
## storcon_cli
|
||||
|
||||
The `storcon_cli` tool enables interactive management of the storage controller. This is usually
|
||||
only necessary for debug, but may also be used to manage nodes (e.g. marking a node as offline).
|
||||
|
||||
`storcon_cli --help` includes details on commands.
|
||||
|
||||
# Deploying
|
||||
|
||||
This section is aimed at engineers deploying the storage controller outside of Neon's cloud platform, as
|
||||
part of a self-hosted system.
|
||||
|
||||
_General note: since the default `neon_local` environment includes a storage controller, this is a useful
|
||||
reference when figuring out deployment._
|
||||
|
||||
## Database
|
||||
|
||||
It is **essential** that the database used by the storage controller is durable (**do not store it on ephemeral
|
||||
local disk**). This database contains pageserver generation numbers, which are essential to data safety on the pageserver.
|
||||
|
||||
The resource requirements for the database are very low: a single CPU core and 1GiB of memory should work well for most deployments. The physical size of the database is typically under a gigabyte.
|
||||
|
||||
Set the URL to the database using the `--database-url` CLI option.
|
||||
|
||||
There is no need to run migrations manually: the storage controller automatically applies migrations
|
||||
when it starts up.
|
||||
|
||||
## Configure pageservers to use the storage controller
|
||||
|
||||
1. The pageserver `control_plane_api` and `control_plane_api_token` should be set in the `pageserver.toml` file. The API setting should
|
||||
point to the "upcall" prefix, for example `http://127.0.0.1:1234/upcall/v1/` is used in neon_local clusters.
|
||||
2. Create a `metadata.json` file in the same directory as `pageserver.toml`: this enables the pageserver to automatically register itself
|
||||
with the storage controller when it starts up. See the example below for the format of this file.
|
||||
|
||||
### Example `metadata.json`
|
||||
|
||||
```
|
||||
{"host":"acmehost.localdomain","http_host":"acmehost.localdomain","http_port":9898,"port":64000}
|
||||
```
|
||||
|
||||
- `port` and `host` refer to the _postgres_ port and host, and these must be accessible from wherever
|
||||
postgres runs.
|
||||
- `http_port` and `http_host` refer to the pageserver's HTTP api, this must be accessible from where
|
||||
the storage controller runs.
|
||||
|
||||
## Handle compute notifications.
|
||||
|
||||
The storage controller independently moves tenant attachments between pageservers in response to
|
||||
changes such as a pageserver node becoming unavailable, or the tenant's shard count changing. To enable
|
||||
postgres clients to handle such changes, the storage controller calls an API hook when a tenant's pageserver
|
||||
location changes.
|
||||
|
||||
The hook is configured using the storage controller's `--compute-hook-url` CLI option. If the hook requires
|
||||
JWT auth, the token may be provided with `--control-plane-jwt-token`. The hook will be invoked with a `PUT` request.
|
||||
|
||||
In the Neon cloud service, this hook is implemented by Neon's internal cloud control plane. In `neon_local` systems
|
||||
the storage controller integrates directly with neon_local to reconfigure local postgres processes instead of calling
|
||||
the compute hook.
|
||||
|
||||
When implementing an on-premise Neon deployment, you must implement a service that handles the compute hook. This is not complicated:
|
||||
the request body has format of the `ComputeHookNotifyRequest` structure, provided below for convenience.
|
||||
|
||||
```
|
||||
struct ComputeHookNotifyRequestShard {
|
||||
node_id: NodeId,
|
||||
shard_number: ShardNumber,
|
||||
}
|
||||
|
||||
struct ComputeHookNotifyRequest {
|
||||
tenant_id: TenantId,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
shards: Vec<ComputeHookNotifyRequestShard>,
|
||||
}
|
||||
```
|
||||
|
||||
When a notification is received:
|
||||
|
||||
1. Modify postgres configuration for this tenant:
|
||||
|
||||
- set `neon.pageserver_connstr` to a comma-separated list of postgres connection strings to pageservers according to the `shards` list. The
|
||||
shards identified by `NodeId` must be converted to the address+port of the node.
|
||||
- if stripe_size is not None, set `neon.stripe_size` to this value
|
||||
|
||||
2. Send SIGHUP to postgres to reload configuration
|
||||
3. Respond with 200 to the notification request. Do not return success if postgres was not updated: if an error is returned, the controller
|
||||
will retry the notification until it succeeds..
|
||||
|
||||
### Example notification body
|
||||
|
||||
```
|
||||
{
|
||||
"tenant_id": "1f359dd625e519a1a4e8d7509690f6fc",
|
||||
"stripe_size": 32768,
|
||||
"shards": [
|
||||
{"node_id": 344, "shard_number": 0},
|
||||
{"node_id": 722, "shard_number": 1},
|
||||
],
|
||||
}
|
||||
```
|
||||
@@ -192,6 +192,14 @@ impl<T> OnceCell<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like [`Guard::take_and_deinit`], but will return `None` if this OnceCell was never
|
||||
/// initialized.
|
||||
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
|
||||
let inner = self.inner.get_mut().unwrap();
|
||||
|
||||
inner.take_and_deinit()
|
||||
}
|
||||
|
||||
/// Return the number of [`Self::get_or_init`] calls waiting for initialization to complete.
|
||||
pub fn initializer_count(&self) -> usize {
|
||||
self.initializers.load(Ordering::Relaxed)
|
||||
@@ -246,15 +254,23 @@ impl<'a, T> Guard<'a, T> {
|
||||
/// The permit will be on a semaphore part of the new internal value, and any following
|
||||
/// [`OnceCell::get_or_init`] will wait on it to complete.
|
||||
pub fn take_and_deinit(mut self) -> (T, InitPermit) {
|
||||
self.0
|
||||
.take_and_deinit()
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
|
||||
let value = self.value.take()?;
|
||||
|
||||
let mut swapped = Inner::default();
|
||||
let sem = swapped.init_semaphore.clone();
|
||||
// acquire and forget right away, moving the control over to InitPermit
|
||||
sem.try_acquire().expect("we just created this").forget();
|
||||
std::mem::swap(&mut *self.0, &mut swapped);
|
||||
swapped
|
||||
.value
|
||||
.map(|v| (v, InitPermit(sem)))
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
let permit = InitPermit(sem);
|
||||
std::mem::swap(self, &mut swapped);
|
||||
Some((value, permit))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -263,6 +279,13 @@ impl<'a, T> Guard<'a, T> {
|
||||
/// On drop, this type will return the permit.
|
||||
pub struct InitPermit(Arc<tokio::sync::Semaphore>);
|
||||
|
||||
impl std::fmt::Debug for InitPermit {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let ptr = Arc::as_ptr(&self.0) as *const ();
|
||||
f.debug_tuple("InitPermit").field(&ptr).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for InitPermit {
|
||||
fn drop(&mut self) {
|
||||
assert_eq!(
|
||||
@@ -559,4 +582,22 @@ mod tests {
|
||||
|
||||
assert_eq!(*target.get().unwrap(), 11);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn take_and_deinit_on_mut() {
|
||||
use std::convert::Infallible;
|
||||
|
||||
let mut target = OnceCell::<u32>::default();
|
||||
assert!(target.take_and_deinit().is_none());
|
||||
|
||||
target
|
||||
.get_or_init(|permit| async move { Ok::<_, Infallible>((42, permit)) })
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let again = target.take_and_deinit();
|
||||
assert!(matches!(again, Some((42, _))), "{again:?}");
|
||||
|
||||
assert!(target.take_and_deinit().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1518,7 +1518,8 @@ pub(crate) struct SecondaryModeMetrics {
|
||||
pub(crate) download_heatmap: IntCounter,
|
||||
pub(crate) download_layer: IntCounter,
|
||||
}
|
||||
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| SecondaryModeMetrics {
|
||||
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
|
||||
SecondaryModeMetrics {
|
||||
upload_heatmap: register_int_counter!(
|
||||
"pageserver_secondary_upload_heatmap",
|
||||
"Number of heatmaps written to remote storage by attached tenants"
|
||||
@@ -1536,7 +1537,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
|
||||
.expect("failed to define a metric"),
|
||||
download_heatmap: register_int_counter!(
|
||||
"pageserver_secondary_download_heatmap",
|
||||
"Number of downloads of heatmaps by secondary mode locations"
|
||||
"Number of downloads of heatmaps by secondary mode locations, including when it hasn't changed"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
download_layer: register_int_counter!(
|
||||
@@ -1544,6 +1545,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
|
||||
"Number of downloads of layers by secondary mode locations"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
}
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
|
||||
@@ -33,6 +33,52 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub(crate) enum InvalidInput {
|
||||
TooShortValue,
|
||||
TooShortPostgresRecord,
|
||||
}
|
||||
|
||||
/// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
|
||||
/// use this type for querying if a slice looks some particular way.
|
||||
#[cfg(test)]
|
||||
pub(crate) struct ValueBytes;
|
||||
|
||||
#[cfg(test)]
|
||||
impl ValueBytes {
|
||||
pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
|
||||
if raw.len() < 12 {
|
||||
return Err(InvalidInput::TooShortValue);
|
||||
}
|
||||
|
||||
let value_discriminator = &raw[0..4];
|
||||
|
||||
if value_discriminator == [0, 0, 0, 0] {
|
||||
// Value::Image always initializes
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
if value_discriminator != [0, 0, 0, 1] {
|
||||
// not a Value::WalRecord(..)
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let walrecord_discriminator = &raw[4..8];
|
||||
|
||||
if walrecord_discriminator != [0, 0, 0, 0] {
|
||||
// only NeonWalRecord::Postgres can have will_init
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if raw.len() < 17 {
|
||||
return Err(InvalidInput::TooShortPostgresRecord);
|
||||
}
|
||||
|
||||
Ok(raw[8] == 1)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
@@ -70,6 +116,8 @@ mod test {
|
||||
];
|
||||
|
||||
roundtrip!(image, expected);
|
||||
|
||||
assert!(ValueBytes::will_init(&expected).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -93,6 +141,96 @@ mod test {
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(ValueBytes::will_init(&expected).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_inspection_too_short_image() {
|
||||
let rec = Value::Image(Bytes::from_static(b""));
|
||||
|
||||
#[rustfmt::skip]
|
||||
let expected = [
|
||||
// top level discriminator of 4 bytes
|
||||
0x00, 0x00, 0x00, 0x00,
|
||||
// 8 byte length
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(ValueBytes::will_init(&expected).unwrap());
|
||||
assert_eq!(expected.len(), 12);
|
||||
for len in 0..12 {
|
||||
assert_eq!(
|
||||
ValueBytes::will_init(&expected[..len]).unwrap_err(),
|
||||
InvalidInput::TooShortValue
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_inspection_too_short_postgres_record() {
|
||||
let rec = NeonWalRecord::Postgres {
|
||||
will_init: false,
|
||||
rec: Bytes::from_static(b""),
|
||||
};
|
||||
let rec = Value::WalRecord(rec);
|
||||
|
||||
#[rustfmt::skip]
|
||||
let expected = [
|
||||
// flattened discriminator of total 8 bytes
|
||||
0x00, 0x00, 0x00, 0x01,
|
||||
0x00, 0x00, 0x00, 0x00,
|
||||
// will_init
|
||||
0x00,
|
||||
// 8 byte length
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(!ValueBytes::will_init(&expected).unwrap());
|
||||
assert_eq!(expected.len(), 17);
|
||||
for len in 12..17 {
|
||||
assert_eq!(
|
||||
ValueBytes::will_init(&expected[..len]).unwrap_err(),
|
||||
InvalidInput::TooShortPostgresRecord
|
||||
)
|
||||
}
|
||||
for len in 0..12 {
|
||||
assert_eq!(
|
||||
ValueBytes::will_init(&expected[..len]).unwrap_err(),
|
||||
InvalidInput::TooShortValue
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_visibility_map_flags_example() {
|
||||
let rec = NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: Some(0x11),
|
||||
old_heap_blkno: None,
|
||||
flags: 0x03,
|
||||
};
|
||||
let rec = Value::WalRecord(rec);
|
||||
|
||||
#[rustfmt::skip]
|
||||
let expected = [
|
||||
// discriminators
|
||||
0x00, 0x00, 0x00, 0x01,
|
||||
0x00, 0x00, 0x00, 0x01,
|
||||
// Some == 1 followed by 4 bytes
|
||||
0x01, 0x00, 0x00, 0x00, 0x11,
|
||||
// None == 0
|
||||
0x00,
|
||||
// flags
|
||||
0x03
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(!ValueBytes::will_init(&expected).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3848,6 +3848,8 @@ pub(crate) mod harness {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use super::*;
|
||||
use crate::keyspace::KeySpaceAccum;
|
||||
use crate::repository::{Key, Value};
|
||||
@@ -3858,7 +3860,7 @@ mod tests {
|
||||
use hex_literal::hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tests::timeline::ShutdownMode;
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
|
||||
@@ -4794,6 +4796,166 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Test that vectored get descends into ancestor timelines correctly and
|
||||
// does not return an image that's newer than requested.
|
||||
//
|
||||
// The diagram below ilustrates an interesting case. We have a parent timeline
|
||||
// (top of the Lsn range) and a child timeline. The request key cannot be reconstructed
|
||||
// from the child timeline, so the parent timeline must be visited. When advacing into
|
||||
// the child timeline, the read path needs to remember what the requested Lsn was in
|
||||
// order to avoid returning an image that's too new. The test below constructs such
|
||||
// a timeline setup and does a few queries around the Lsn of each page image.
|
||||
// ```
|
||||
// LSN
|
||||
// ^
|
||||
// |
|
||||
// |
|
||||
// 500 | --------------------------------------> branch point
|
||||
// 400 | X
|
||||
// 300 | X
|
||||
// 200 | --------------------------------------> requested lsn
|
||||
// 100 | X
|
||||
// |---------------------------------------> Key
|
||||
// |
|
||||
// ------> requested key
|
||||
//
|
||||
// Legend:
|
||||
// * X - page images
|
||||
// ```
|
||||
#[tokio::test]
|
||||
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
|
||||
let end_key = start_key.add(1000);
|
||||
let child_gap_at_key = start_key.add(500);
|
||||
let mut parent_gap_lsns: BTreeMap<Lsn, String> = BTreeMap::new();
|
||||
|
||||
let mut current_lsn = Lsn(0x10);
|
||||
|
||||
let timeline_id = TimelineId::generate();
|
||||
let parent_timeline = tenant
|
||||
.create_test_timeline(timeline_id, current_lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
current_lsn += 0x100;
|
||||
|
||||
for _ in 0..3 {
|
||||
let mut key = start_key;
|
||||
while key < end_key {
|
||||
current_lsn += 0x10;
|
||||
|
||||
let image_value = format!("{} at {}", child_gap_at_key, current_lsn);
|
||||
|
||||
let mut writer = parent_timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
key,
|
||||
current_lsn,
|
||||
&Value::Image(test_img(&image_value)),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(current_lsn);
|
||||
|
||||
if key == child_gap_at_key {
|
||||
parent_gap_lsns.insert(current_lsn, image_value);
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
parent_timeline.freeze_and_flush().await?;
|
||||
}
|
||||
|
||||
let child_timeline_id = TimelineId::generate();
|
||||
|
||||
let child_timeline = tenant
|
||||
.branch_timeline_test(&parent_timeline, child_timeline_id, Some(current_lsn), &ctx)
|
||||
.await?;
|
||||
|
||||
let mut key = start_key;
|
||||
while key < end_key {
|
||||
if key == child_gap_at_key {
|
||||
key = key.next();
|
||||
continue;
|
||||
}
|
||||
|
||||
current_lsn += 0x10;
|
||||
|
||||
let mut writer = child_timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
key,
|
||||
current_lsn,
|
||||
&Value::Image(test_img(&format!("{} at {}", key, current_lsn))),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(current_lsn);
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
child_timeline.freeze_and_flush().await?;
|
||||
|
||||
let lsn_offsets: [i64; 5] = [-10, -1, 0, 1, 10];
|
||||
let mut query_lsns = Vec::new();
|
||||
for image_lsn in parent_gap_lsns.keys().rev() {
|
||||
for offset in lsn_offsets {
|
||||
query_lsns.push(Lsn(image_lsn
|
||||
.0
|
||||
.checked_add_signed(offset)
|
||||
.expect("Shouldn't overflow")));
|
||||
}
|
||||
}
|
||||
|
||||
for query_lsn in query_lsns {
|
||||
let results = child_timeline
|
||||
.get_vectored_impl(
|
||||
KeySpace {
|
||||
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
|
||||
},
|
||||
query_lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected_item = parent_gap_lsns
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|(lsn, _)| **lsn <= query_lsn);
|
||||
|
||||
info!(
|
||||
"Doing vectored read at LSN {}. Expecting image to be: {:?}",
|
||||
query_lsn, expected_item
|
||||
);
|
||||
|
||||
match expected_item {
|
||||
Some((_, img_value)) => {
|
||||
let key_results = results.expect("No vectored get error expected");
|
||||
let key_result = &key_results[&child_gap_at_key];
|
||||
let returned_img = key_result
|
||||
.as_ref()
|
||||
.expect("No page reconstruct error expected");
|
||||
|
||||
info!(
|
||||
"Vectored read at LSN {} returned image {}",
|
||||
query_lsn,
|
||||
std::str::from_utf8(returned_img)?
|
||||
);
|
||||
assert_eq!(*returned_img, test_img(img_value));
|
||||
}
|
||||
None => {
|
||||
assert!(matches!(results, Err(GetVectoredError::MissingKey(_))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_random_updates")?;
|
||||
|
||||
@@ -678,12 +678,19 @@ pub async fn init_tenant_mgr(
|
||||
}
|
||||
}
|
||||
}
|
||||
LocationMode::Secondary(secondary_conf) => TenantSlot::Secondary(SecondaryTenant::new(
|
||||
tenant_shard_id,
|
||||
shard_identity,
|
||||
location_conf.tenant_conf,
|
||||
&secondary_conf,
|
||||
)),
|
||||
LocationMode::Secondary(secondary_conf) => {
|
||||
info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
"Starting secondary tenant"
|
||||
);
|
||||
TenantSlot::Secondary(SecondaryTenant::new(
|
||||
tenant_shard_id,
|
||||
shard_identity,
|
||||
location_conf.tenant_conf,
|
||||
&secondary_conf,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
tenants.insert(tenant_shard_id, slot);
|
||||
|
||||
@@ -312,7 +312,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
(detail.last_download, detail.next_download.unwrap())
|
||||
};
|
||||
|
||||
if now < next_download {
|
||||
if now > next_download {
|
||||
Some(PendingDownload {
|
||||
secondary_state: secondary_tenant,
|
||||
last_download,
|
||||
@@ -647,6 +647,12 @@ impl<'a> TenantDownloader<'a> {
|
||||
progress.bytes_downloaded += layer_byte_count;
|
||||
progress.layers_downloaded += layer_count;
|
||||
}
|
||||
|
||||
for delete_timeline in &delete_timelines {
|
||||
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
|
||||
// from disk fails that will be a fatal error.
|
||||
detail.timelines.remove(delete_timeline);
|
||||
}
|
||||
}
|
||||
|
||||
// Execute accumulated deletions
|
||||
@@ -710,13 +716,14 @@ impl<'a> TenantDownloader<'a> {
|
||||
.await
|
||||
.map_err(UpdateError::from)?;
|
||||
|
||||
SECONDARY_MODE.download_heatmap.inc();
|
||||
|
||||
if Some(&download.etag) == prev_etag {
|
||||
Ok(HeatMapDownload::Unmodified)
|
||||
} else {
|
||||
let mut heatmap_bytes = Vec::new();
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
|
||||
SECONDARY_MODE.download_heatmap.inc();
|
||||
Ok(HeatMapDownload::Modified(HeatMapModified {
|
||||
etag: download.etag,
|
||||
last_modified: download.last_modified,
|
||||
|
||||
@@ -20,8 +20,8 @@
|
||||
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
|
||||
//! ```
|
||||
//!
|
||||
//! Every delta file consists of three parts: "summary", "index", and
|
||||
//! "values". The summary is a fixed size header at the beginning of the file,
|
||||
//! Every delta file consists of three parts: "summary", "values", and
|
||||
//! "index". The summary is a fixed size header at the beginning of the file,
|
||||
//! and it contains basic information about the layer, and offsets to the other
|
||||
//! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
|
||||
//! "values" part. The actual page images and WAL records are stored in the
|
||||
@@ -863,7 +863,7 @@ impl DeltaLayerInner {
|
||||
.into(),
|
||||
);
|
||||
|
||||
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let data_end_offset = self.index_start_offset();
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
keyspace,
|
||||
@@ -1103,11 +1103,195 @@ impl DeltaLayerInner {
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
// Last key occupies all space till end of value storage,
|
||||
// which corresponds to beginning of the index
|
||||
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
|
||||
last.size = self.index_start_offset() - last.size;
|
||||
}
|
||||
Ok(all_keys)
|
||||
}
|
||||
|
||||
/// Using the given writer, write out a truncated version, where LSNs higher than the
|
||||
/// truncate_at are missing.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn copy_prefix(
|
||||
&self,
|
||||
writer: &mut DeltaLayerWriter,
|
||||
truncate_at: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
|
||||
};
|
||||
use futures::stream::TryStreamExt;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Item {
|
||||
Actual(Key, Lsn, BlobRef),
|
||||
Sentinel,
|
||||
}
|
||||
|
||||
impl From<Item> for Option<(Key, Lsn, BlobRef)> {
|
||||
fn from(value: Item) -> Self {
|
||||
match value {
|
||||
Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
|
||||
Item::Sentinel => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Item {
|
||||
fn offset(&self) -> Option<BlobRef> {
|
||||
match self {
|
||||
Item::Actual(_, _, blob) => Some(*blob),
|
||||
Item::Sentinel => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_last(&self) -> bool {
|
||||
matches!(self, Item::Sentinel)
|
||||
}
|
||||
}
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
|
||||
let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
|
||||
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
|
||||
// put in a sentinel value for getting the end offset for last item, and not having to
|
||||
// repeat the whole read part
|
||||
let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
|
||||
Item::Sentinel,
|
||||
))));
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
|
||||
|
||||
let mut read_builder: Option<VectoredReadBuilder> = None;
|
||||
|
||||
let max_read_size = self
|
||||
.max_vectored_read_bytes
|
||||
.map(|x| x.0.get())
|
||||
.unwrap_or(8192);
|
||||
|
||||
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
|
||||
|
||||
// FIXME: buffering of DeltaLayerWriter
|
||||
let mut per_blob_copy = Vec::new();
|
||||
|
||||
while let Some(item) = stream.try_next().await? {
|
||||
tracing::debug!(?item, "popped");
|
||||
let offset = item
|
||||
.offset()
|
||||
.unwrap_or(BlobRef::new(self.index_start_offset(), false));
|
||||
|
||||
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
|
||||
let end_offset = offset;
|
||||
|
||||
Some((BlobMeta { key, lsn }, start_offset..end_offset))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let is_last = item.is_last();
|
||||
|
||||
prev = Option::from(item);
|
||||
|
||||
let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
|
||||
|
||||
let builder = if let Some((meta, offsets)) = actionable {
|
||||
// extend or create a new builder
|
||||
if read_builder
|
||||
.as_mut()
|
||||
.map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
|
||||
.unwrap_or(VectoredReadExtended::No)
|
||||
== VectoredReadExtended::Yes
|
||||
{
|
||||
None
|
||||
} else {
|
||||
read_builder.replace(VectoredReadBuilder::new(
|
||||
offsets.start.pos(),
|
||||
offsets.end.pos(),
|
||||
meta,
|
||||
max_read_size,
|
||||
))
|
||||
}
|
||||
} else {
|
||||
// nothing to do, except perhaps flush any existing for the last element
|
||||
None
|
||||
};
|
||||
|
||||
// flush the possible older builder and also the new one if the item was the last one
|
||||
let builders = builder.into_iter();
|
||||
let builders = if is_last {
|
||||
builders.chain(read_builder.take())
|
||||
} else {
|
||||
builders.chain(None)
|
||||
};
|
||||
|
||||
for builder in builders {
|
||||
let read = builder.build();
|
||||
|
||||
let reader = VectoredBlobReader::new(&self.file);
|
||||
|
||||
let mut buf = buffer.take().unwrap();
|
||||
|
||||
buf.clear();
|
||||
buf.reserve(read.size());
|
||||
let res = reader.read_blobs(&read, buf).await?;
|
||||
|
||||
for blob in res.blobs {
|
||||
let key = blob.meta.key;
|
||||
let lsn = blob.meta.lsn;
|
||||
let data = &res.buf[blob.start..blob.end];
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
Value::des(data)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
|
||||
blob.meta.key,
|
||||
blob.meta.lsn,
|
||||
blob.start,
|
||||
blob.end,
|
||||
utils::Hex(data)
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// is it an image or will_init walrecord?
|
||||
// FIXME: this could be handled by threading the BlobRef to the
|
||||
// VectoredReadBuilder
|
||||
let will_init = crate::repository::ValueBytes::will_init(data)
|
||||
.inspect_err(|_e| {
|
||||
#[cfg(feature = "testing")]
|
||||
tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
per_blob_copy.clear();
|
||||
per_blob_copy.extend_from_slice(data);
|
||||
|
||||
let (tmp, res) = writer
|
||||
.put_value_bytes(key, lsn, std::mem::take(&mut per_blob_copy), will_init)
|
||||
.await;
|
||||
per_blob_copy = tmp;
|
||||
res?;
|
||||
}
|
||||
|
||||
buffer = Some(res.buf);
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
read_builder.is_none(),
|
||||
"with the sentinel above loop should had handled all"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
@@ -1177,6 +1361,44 @@ impl DeltaLayerInner {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn stream_index_forwards<'a, R>(
|
||||
&'a self,
|
||||
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
|
||||
start: &'a [u8; DELTA_KEY_SIZE],
|
||||
ctx: &'a RequestContext,
|
||||
) -> impl futures::stream::Stream<
|
||||
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
|
||||
> + 'a
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
use futures::stream::TryStreamExt;
|
||||
let stream = reader.get_stream_from(start, ctx);
|
||||
stream.map_ok(|(key, value)| {
|
||||
let key = DeltaKey::from_slice(&key);
|
||||
let (key, lsn) = (key.key(), key.lsn());
|
||||
let offset = BlobRef(value);
|
||||
|
||||
(key, lsn, offset)
|
||||
})
|
||||
}
|
||||
|
||||
/// The file offset to the first block of index.
|
||||
///
|
||||
/// The file structure is summary, values, and index. We often need this for the size of last blob.
|
||||
fn index_start_offset(&self) -> u64 {
|
||||
let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let bref = BlobRef(offset);
|
||||
tracing::debug!(
|
||||
index_start_blk = self.index_start_blk,
|
||||
offset,
|
||||
pos = bref.pos(),
|
||||
"index_start_offset"
|
||||
);
|
||||
offset
|
||||
}
|
||||
}
|
||||
|
||||
/// A set of data associated with a delta layer key and its value
|
||||
@@ -1538,7 +1760,7 @@ mod test {
|
||||
|
||||
let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
|
||||
|
||||
let inner = resident.get_inner_delta(&ctx).await?;
|
||||
let inner = resident.as_delta(&ctx).await?;
|
||||
|
||||
let file_size = inner.file.metadata().await?.len();
|
||||
tracing::info!(
|
||||
@@ -1594,4 +1816,217 @@ mod test {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn copy_delta_prefix_smoke() {
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use bytes::Bytes;
|
||||
|
||||
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
|
||||
let (tenant, ctx) = h.load().await;
|
||||
let ctx = &ctx;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let initdb_layer = timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.likely_resident_layers()
|
||||
.next()
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut writer = timeline.writer().await;
|
||||
|
||||
let data = [
|
||||
(0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
|
||||
(
|
||||
0x30,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: false,
|
||||
rec: Bytes::from_static(b"1"),
|
||||
}),
|
||||
),
|
||||
(
|
||||
0x40,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: true,
|
||||
rec: Bytes::from_static(b"2"),
|
||||
}),
|
||||
),
|
||||
// build an oversized value so we cannot extend and existing read over
|
||||
// this
|
||||
(
|
||||
0x50,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: true,
|
||||
rec: {
|
||||
let mut buf =
|
||||
vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
|
||||
buf.iter_mut()
|
||||
.enumerate()
|
||||
.for_each(|(i, slot)| *slot = (i % 256) as u8);
|
||||
Bytes::from(buf)
|
||||
},
|
||||
}),
|
||||
),
|
||||
// because the oversized read cannot be extended further, we are sure to exercise the
|
||||
// builder created on the last round with this:
|
||||
(
|
||||
0x60,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: true,
|
||||
rec: Bytes::from_static(b"3"),
|
||||
}),
|
||||
),
|
||||
(
|
||||
0x60,
|
||||
9,
|
||||
Value::Image(Bytes::from_static(b"something for a different key")),
|
||||
),
|
||||
];
|
||||
|
||||
let mut last_lsn = None;
|
||||
|
||||
for (lsn, key, value) in data {
|
||||
let key = Key::from_i128(key);
|
||||
writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
|
||||
last_lsn = Some(lsn);
|
||||
}
|
||||
|
||||
writer.finish_write(Lsn(last_lsn.unwrap()));
|
||||
}
|
||||
timeline.freeze_and_flush().await.unwrap();
|
||||
|
||||
let new_layer = timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.likely_resident_layers()
|
||||
.find(|x| x != &initdb_layer)
|
||||
.unwrap();
|
||||
|
||||
// create a copy for the timeline, so we don't overwrite the file
|
||||
let branch = tenant
|
||||
.branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
|
||||
|
||||
// truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
|
||||
// a single key
|
||||
|
||||
for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
|
||||
let truncate_at = Lsn(truncate_at);
|
||||
|
||||
let mut writer = DeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
branch.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
Key::MIN,
|
||||
Lsn(0x11)..truncate_at,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let new_layer = new_layer.download_and_keep_resident().await.unwrap();
|
||||
|
||||
new_layer
|
||||
.copy_delta_prefix(&mut writer, truncate_at, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let copied_layer = writer.finish(Key::MAX, &branch).await.unwrap();
|
||||
|
||||
copied_layer.as_delta(ctx).await.unwrap();
|
||||
|
||||
assert_keys_and_values_eq(
|
||||
new_layer.as_delta(ctx).await.unwrap(),
|
||||
copied_layer.as_delta(ctx).await.unwrap(),
|
||||
truncate_at,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn assert_keys_and_values_eq(
|
||||
source: &DeltaLayerInner,
|
||||
truncated: &DeltaLayerInner,
|
||||
truncated_at: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
use futures::future::ready;
|
||||
use futures::stream::TryStreamExt;
|
||||
|
||||
let start_key = [0u8; DELTA_KEY_SIZE];
|
||||
|
||||
let source_reader = FileBlockReader::new(&source.file, source.file_id);
|
||||
let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
source.index_start_blk,
|
||||
source.index_root_blk,
|
||||
&source_reader,
|
||||
);
|
||||
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
|
||||
let source_stream = source_stream.filter(|res| match res {
|
||||
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
|
||||
_ => ready(true),
|
||||
});
|
||||
let mut source_stream = std::pin::pin!(source_stream);
|
||||
|
||||
let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
|
||||
let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
truncated.index_start_blk,
|
||||
truncated.index_root_blk,
|
||||
&truncated_reader,
|
||||
);
|
||||
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
|
||||
let mut truncated_stream = std::pin::pin!(truncated_stream);
|
||||
|
||||
let mut scratch_left = Vec::new();
|
||||
let mut scratch_right = Vec::new();
|
||||
|
||||
loop {
|
||||
let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
|
||||
let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
|
||||
|
||||
if src.is_none() {
|
||||
assert!(truncated.is_none());
|
||||
break;
|
||||
}
|
||||
|
||||
let (src, truncated) = (src.unwrap(), truncated.unwrap());
|
||||
|
||||
// because we've filtered the source with Lsn, we should always have the same keys from both.
|
||||
assert_eq!(src.0, truncated.0);
|
||||
assert_eq!(src.1, truncated.1);
|
||||
|
||||
// if this is needed for something else, just drop this assert.
|
||||
assert!(
|
||||
src.2.pos() >= truncated.2.pos(),
|
||||
"value position should not go backwards {} vs. {}",
|
||||
src.2.pos(),
|
||||
truncated.2.pos()
|
||||
);
|
||||
|
||||
scratch_left.clear();
|
||||
let src_cursor = source_reader.block_cursor();
|
||||
let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
|
||||
scratch_right.clear();
|
||||
let trunc_cursor = truncated_reader.block_cursor();
|
||||
let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
|
||||
|
||||
tokio::try_join!(left, right).unwrap();
|
||||
|
||||
assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,6 +116,12 @@ impl AsLayerDesc for Layer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Layer {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
/// Creates a layer value for a file we know to not be resident.
|
||||
pub(crate) fn for_evicted(
|
||||
@@ -604,9 +610,17 @@ enum Status {
|
||||
|
||||
impl Drop for LayerInner {
|
||||
fn drop(&mut self) {
|
||||
// if there was a pending eviction, mark it cancelled here to balance metrics
|
||||
if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
|
||||
{
|
||||
// eviction has already been started
|
||||
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
|
||||
|
||||
// eviction request is intentionally not honored as no one is present to wait for it
|
||||
// and we could be delaying shutdown for nothing.
|
||||
}
|
||||
|
||||
if !*self.wanted_deleted.get_mut() {
|
||||
// should we try to evict if the last wish was for eviction? seems more like a hazard
|
||||
// than a clear win.
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1552,8 +1566,8 @@ impl Drop for DownloadedLayer {
|
||||
if let Some(owner) = self.owner.upgrade() {
|
||||
owner.on_downloaded_layer_drop(self.version);
|
||||
} else {
|
||||
// no need to do anything, we are shutting down
|
||||
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
|
||||
// Layer::drop will handle cancelling the eviction; because of drop order and
|
||||
// `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1752,6 +1766,28 @@ impl ResidentLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// FIXME: truncate is bad name because we are not truncating anything, but copying the
|
||||
/// filtered parts.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn copy_delta_prefix(
|
||||
&self,
|
||||
writer: &mut super::delta_layer::DeltaLayerWriter,
|
||||
truncate_at: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use LayerKind::*;
|
||||
|
||||
let owner = &self.owner.0;
|
||||
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
Delta(ref d) => d
|
||||
.copy_prefix(writer, truncate_at, ctx)
|
||||
.await
|
||||
.with_context(|| format!("truncate {self}")),
|
||||
Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> &Utf8Path {
|
||||
&self.owner.0.path
|
||||
}
|
||||
@@ -1761,14 +1797,14 @@ impl ResidentLayer {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn get_inner_delta<'a>(
|
||||
&'a self,
|
||||
pub(crate) async fn as_delta(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
|
||||
let owner = &self.owner.0;
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
LayerKind::Delta(d) => Ok(d),
|
||||
LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
|
||||
) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
|
||||
use LayerKind::*;
|
||||
match self.downloaded.get(&self.owner.0, ctx).await? {
|
||||
Delta(ref d) => Ok(d),
|
||||
Image(_) => Err(anyhow::anyhow!("image layer")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -721,6 +721,103 @@ async fn evict_and_wait_does_not_wait_for_download() {
|
||||
layer.evict_and_wait(FOREVER).await.unwrap();
|
||||
}
|
||||
|
||||
/// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
|
||||
/// which is the last value.
|
||||
///
|
||||
/// Also checks that the same does not happen on a non-evicted layer (regression test).
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn eviction_cancellation_on_drop() {
|
||||
use crate::repository::Value;
|
||||
use bytes::Bytes;
|
||||
|
||||
// this is the runtime on which Layer spawns the blocking tasks on
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
|
||||
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
let (tenant, ctx) = h.load().await;
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
// create_test_timeline wrote us one layer, write another
|
||||
let mut writer = timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
Key::from_i128(5),
|
||||
Lsn(0x20),
|
||||
&Value::Image(Bytes::from_static(b"this does not matter either")),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
writer.finish_write(Lsn(0x20));
|
||||
}
|
||||
|
||||
timeline.freeze_and_flush().await.unwrap();
|
||||
|
||||
// wait for the upload to complete so our Arc::strong_count assertion holds
|
||||
timeline
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.wait_completion()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (evicted_layer, not_evicted) = {
|
||||
let mut layers = {
|
||||
let mut guard = timeline.layers.write().await;
|
||||
let layers = guard.likely_resident_layers().collect::<Vec<_>>();
|
||||
// remove the layers from layermap
|
||||
guard.finish_gc_timeline(&layers);
|
||||
|
||||
layers
|
||||
};
|
||||
|
||||
assert_eq!(layers.len(), 2);
|
||||
|
||||
(layers.pop().unwrap(), layers.pop().unwrap())
|
||||
};
|
||||
|
||||
let victims = [(evicted_layer, true), (not_evicted, false)];
|
||||
|
||||
for (victim, evict) in victims {
|
||||
let resident = victim.keep_resident().await.unwrap();
|
||||
drop(victim);
|
||||
|
||||
assert_eq!(Arc::strong_count(&resident.owner.0), 1);
|
||||
|
||||
if evict {
|
||||
let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
|
||||
|
||||
// drive the future to await on the status channel, and then drop it
|
||||
tokio::time::timeout(ADVANCE, evict_and_wait)
|
||||
.await
|
||||
.expect_err("should had been a timeout since we are holding the layer resident");
|
||||
}
|
||||
|
||||
// 1 == we only evict one of the layers
|
||||
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
|
||||
|
||||
drop(resident);
|
||||
|
||||
// run any spawned
|
||||
tokio::time::sleep(ADVANCE).await;
|
||||
|
||||
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
|
||||
|
||||
assert_eq!(
|
||||
1,
|
||||
LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn layer_size() {
|
||||
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
|
||||
|
||||
@@ -2968,7 +2968,8 @@ impl Timeline {
|
||||
break;
|
||||
}
|
||||
|
||||
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
|
||||
// Take the min to avoid reconstructing a page with data newer than request Lsn.
|
||||
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ctx)
|
||||
.await
|
||||
|
||||
@@ -61,18 +61,18 @@ pub struct VectoredRead {
|
||||
}
|
||||
|
||||
impl VectoredRead {
|
||||
pub fn size(&self) -> usize {
|
||||
pub(crate) fn size(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
enum VectoredReadExtended {
|
||||
pub(crate) enum VectoredReadExtended {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
struct VectoredReadBuilder {
|
||||
pub(crate) struct VectoredReadBuilder {
|
||||
start: u64,
|
||||
end: u64,
|
||||
blobs_at: VecMap<u64, BlobMeta>,
|
||||
@@ -80,7 +80,17 @@ struct VectoredReadBuilder {
|
||||
}
|
||||
|
||||
impl VectoredReadBuilder {
|
||||
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
|
||||
/// Start building a new vectored read.
|
||||
///
|
||||
/// Note that by design, this does not check against reading more than `max_read_size` to
|
||||
/// support reading larger blobs than the configuration value. The builder will be single use
|
||||
/// however after that.
|
||||
pub(crate) fn new(
|
||||
start_offset: u64,
|
||||
end_offset: u64,
|
||||
meta: BlobMeta,
|
||||
max_read_size: usize,
|
||||
) -> Self {
|
||||
let mut blobs_at = VecMap::default();
|
||||
blobs_at
|
||||
.append(start_offset, meta)
|
||||
@@ -97,7 +107,8 @@ impl VectoredReadBuilder {
|
||||
/// Attempt to extend the current read with a new blob if the start
|
||||
/// offset matches with the current end of the vectored read
|
||||
/// and the resuting size is below the max read size
|
||||
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
|
||||
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
|
||||
tracing::trace!(start, end, "trying to extend");
|
||||
let size = (end - start) as usize;
|
||||
if self.end == start && self.size() + size <= self.max_read_size {
|
||||
self.end = end;
|
||||
@@ -111,11 +122,11 @@ impl VectoredReadBuilder {
|
||||
VectoredReadExtended::No
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
pub(crate) fn size(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
|
||||
fn build(self) -> VectoredRead {
|
||||
pub(crate) fn build(self) -> VectoredRead {
|
||||
VectoredRead {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
|
||||
@@ -55,6 +55,7 @@ impl NeonWalRecord {
|
||||
/// Does replaying this WAL record initialize the page from scratch, or does
|
||||
/// it need to be applied over the previous image of the page?
|
||||
pub fn will_init(&self) -> bool {
|
||||
// If you change this function, you'll also need to change ValueBytes::will_init
|
||||
match self {
|
||||
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
|
||||
|
||||
|
||||
156
poetry.lock
generated
156
poetry.lock
generated
@@ -2,87 +2,87 @@
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
version = "3.9.2"
|
||||
version = "3.9.4"
|
||||
description = "Async http client/server framework (asyncio)"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:772fbe371788e61c58d6d3d904268e48a594ba866804d08c995ad71b144f94cb"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:edd4f1af2253f227ae311ab3d403d0c506c9b4410c7fc8d9573dec6d9740369f"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:cfee9287778399fdef6f8a11c9e425e1cb13cc9920fd3a3df8f122500978292b"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3cc158466f6a980a6095ee55174d1de5730ad7dec251be655d9a6a9dd7ea1ff9"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:54ec82f45d57c9a65a1ead3953b51c704f9587440e6682f689da97f3e8defa35"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:abeb813a18eb387f0d835ef51f88568540ad0325807a77a6e501fed4610f864e"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cc91d07280d7d169f3a0f9179d8babd0ee05c79d4d891447629ff0d7d8089ec2"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b65e861f4bebfb660f7f0f40fa3eb9f2ab9af10647d05dac824390e7af8f75b7"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:04fd8ffd2be73d42bcf55fd78cde7958eeee6d4d8f73c3846b7cba491ecdb570"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:3d8d962b439a859b3ded9a1e111a4615357b01620a546bc601f25b0211f2da81"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:8ceb658afd12b27552597cf9a65d9807d58aef45adbb58616cdd5ad4c258c39e"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:0e4ee4df741670560b1bc393672035418bf9063718fee05e1796bf867e995fad"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:2dec87a556f300d3211decf018bfd263424f0690fcca00de94a837949fbcea02"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-win32.whl", hash = "sha256:3e1a800f988ce7c4917f34096f81585a73dbf65b5c39618b37926b1238cf9bc4"},
|
||||
{file = "aiohttp-3.9.2-cp310-cp310-win_amd64.whl", hash = "sha256:ea510718a41b95c236c992b89fdfc3d04cc7ca60281f93aaada497c2b4e05c46"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:6aaa6f99256dd1b5756a50891a20f0d252bd7bdb0854c5d440edab4495c9f973"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a27d8c70ad87bcfce2e97488652075a9bdd5b70093f50b10ae051dfe5e6baf37"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:54287bcb74d21715ac8382e9de146d9442b5f133d9babb7e5d9e453faadd005e"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5bb3d05569aa83011fcb346b5266e00b04180105fcacc63743fc2e4a1862a891"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c8534e7d69bb8e8d134fe2be9890d1b863518582f30c9874ed7ed12e48abe3c4"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4bd9d5b989d57b41e4ff56ab250c5ddf259f32db17159cce630fd543376bd96b"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa6904088e6642609981f919ba775838ebf7df7fe64998b1a954fb411ffb4663"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bda42eb410be91b349fb4ee3a23a30ee301c391e503996a638d05659d76ea4c2"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:193cc1ccd69d819562cc7f345c815a6fc51d223b2ef22f23c1a0f67a88de9a72"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:b9f1cb839b621f84a5b006848e336cf1496688059d2408e617af33e3470ba204"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:d22a0931848b8c7a023c695fa2057c6aaac19085f257d48baa24455e67df97ec"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:4112d8ba61fbd0abd5d43a9cb312214565b446d926e282a6d7da3f5a5aa71d36"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:c4ad4241b52bb2eb7a4d2bde060d31c2b255b8c6597dd8deac2f039168d14fd7"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-win32.whl", hash = "sha256:ee2661a3f5b529f4fc8a8ffee9f736ae054adfb353a0d2f78218be90617194b3"},
|
||||
{file = "aiohttp-3.9.2-cp311-cp311-win_amd64.whl", hash = "sha256:4deae2c165a5db1ed97df2868ef31ca3cc999988812e82386d22937d9d6fed52"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:6f4cdba12539215aaecf3c310ce9d067b0081a0795dd8a8805fdb67a65c0572a"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:84e843b33d5460a5c501c05539809ff3aee07436296ff9fbc4d327e32aa3a326"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8008d0f451d66140a5aa1c17e3eedc9d56e14207568cd42072c9d6b92bf19b52"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:61c47ab8ef629793c086378b1df93d18438612d3ed60dca76c3422f4fbafa792"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:bc71f748e12284312f140eaa6599a520389273174b42c345d13c7e07792f4f57"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a1c3a4d0ab2f75f22ec80bca62385db2e8810ee12efa8c9e92efea45c1849133"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9a87aa0b13bbee025faa59fa58861303c2b064b9855d4c0e45ec70182bbeba1b"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e2cc0d04688b9f4a7854c56c18aa7af9e5b0a87a28f934e2e596ba7e14783192"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:1956e3ac376b1711c1533266dec4efd485f821d84c13ce1217d53e42c9e65f08"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:114da29f39eccd71b93a0fcacff178749a5c3559009b4a4498c2c173a6d74dff"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:3f17999ae3927d8a9a823a1283b201344a0627272f92d4f3e3a4efe276972fe8"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:f31df6a32217a34ae2f813b152a6f348154f948c83213b690e59d9e84020925c"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:7a75307ffe31329928a8d47eae0692192327c599113d41b278d4c12b54e1bd11"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-win32.whl", hash = "sha256:972b63d589ff8f305463593050a31b5ce91638918da38139b9d8deaba9e0fed7"},
|
||||
{file = "aiohttp-3.9.2-cp312-cp312-win_amd64.whl", hash = "sha256:200dc0246f0cb5405c80d18ac905c8350179c063ea1587580e3335bfc243ba6a"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:158564d0d1020e0d3fe919a81d97aadad35171e13e7b425b244ad4337fc6793a"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:da1346cd0ccb395f0ed16b113ebb626fa43b7b07fd7344fce33e7a4f04a8897a"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:eaa9256de26ea0334ffa25f1913ae15a51e35c529a1ed9af8e6286dd44312554"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1543e7fb00214fb4ccead42e6a7d86f3bb7c34751ec7c605cca7388e525fd0b4"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:186e94570433a004e05f31f632726ae0f2c9dee4762a9ce915769ce9c0a23d89"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d52d20832ac1560f4510d68e7ba8befbc801a2b77df12bd0cd2bcf3b049e52a4"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1c45e4e815ac6af3b72ca2bde9b608d2571737bb1e2d42299fc1ffdf60f6f9a1"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aa906b9bdfd4a7972dd0628dbbd6413d2062df5b431194486a78f0d2ae87bd55"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:68bbee9e17d66f17bb0010aa15a22c6eb28583edcc8b3212e2b8e3f77f3ebe2a"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:4c189b64bd6d9a403a1a3f86a3ab3acbc3dc41a68f73a268a4f683f89a4dec1f"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:8a7876f794523123bca6d44bfecd89c9fec9ec897a25f3dd202ee7fc5c6525b7"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:d23fba734e3dd7b1d679b9473129cd52e4ec0e65a4512b488981a56420e708db"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b141753be581fab842a25cb319f79536d19c2a51995d7d8b29ee290169868eab"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-win32.whl", hash = "sha256:103daf41ff3b53ba6fa09ad410793e2e76c9d0269151812e5aba4b9dd674a7e8"},
|
||||
{file = "aiohttp-3.9.2-cp38-cp38-win_amd64.whl", hash = "sha256:328918a6c2835861ff7afa8c6d2c70c35fdaf996205d5932351bdd952f33fa2f"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5264d7327c9464786f74e4ec9342afbbb6ee70dfbb2ec9e3dfce7a54c8043aa3"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:07205ae0015e05c78b3288c1517afa000823a678a41594b3fdc870878d645305"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ae0a1e638cffc3ec4d4784b8b4fd1cf28968febc4bd2718ffa25b99b96a741bd"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d43302a30ba1166325974858e6ef31727a23bdd12db40e725bec0f759abce505"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:16a967685907003765855999af11a79b24e70b34dc710f77a38d21cd9fc4f5fe"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6fa3ee92cd441d5c2d07ca88d7a9cef50f7ec975f0117cd0c62018022a184308"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b500c5ad9c07639d48615a770f49618130e61be36608fc9bc2d9bae31732b8f"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c07327b368745b1ce2393ae9e1aafed7073d9199e1dcba14e035cc646c7941bf"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:cc7d6502c23a0ec109687bf31909b3fb7b196faf198f8cff68c81b49eb316ea9"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:07be2be7071723c3509ab5c08108d3a74f2181d4964e869f2504aaab68f8d3e8"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:122468f6fee5fcbe67cb07014a08c195b3d4c41ff71e7b5160a7bcc41d585a5f"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:00a9abcea793c81e7f8778ca195a1714a64f6d7436c4c0bb168ad2a212627000"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7a9825fdd64ecac5c670234d80bb52bdcaa4139d1f839165f548208b3779c6c6"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-win32.whl", hash = "sha256:5422cd9a4a00f24c7244e1b15aa9b87935c85fb6a00c8ac9b2527b38627a9211"},
|
||||
{file = "aiohttp-3.9.2-cp39-cp39-win_amd64.whl", hash = "sha256:7d579dcd5d82a86a46f725458418458fa43686f6a7b252f2966d359033ffc8ab"},
|
||||
{file = "aiohttp-3.9.2.tar.gz", hash = "sha256:b0ad0a5e86ce73f5368a164c10ada10504bf91869c05ab75d982c6048217fbf7"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:76d32588ef7e4a3f3adff1956a0ba96faabbdee58f2407c122dd45aa6e34f372"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:56181093c10dbc6ceb8a29dfeea1e815e1dfdc020169203d87fd8d37616f73f9"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c7a5b676d3c65e88b3aca41816bf72831898fcd73f0cbb2680e9d88e819d1e4d"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d1df528a85fb404899d4207a8d9934cfd6be626e30e5d3a5544a83dbae6d8a7e"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f595db1bceabd71c82e92df212dd9525a8a2c6947d39e3c994c4f27d2fe15b11"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9c0b09d76e5a4caac3d27752027fbd43dc987b95f3748fad2b924a03fe8632ad"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:689eb4356649ec9535b3686200b231876fb4cab4aca54e3bece71d37f50c1d13"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a3666cf4182efdb44d73602379a66f5fdfd5da0db5e4520f0ac0dcca644a3497"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b65b0f8747b013570eea2f75726046fa54fa8e0c5db60f3b98dd5d161052004a"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:a1885d2470955f70dfdd33a02e1749613c5a9c5ab855f6db38e0b9389453dce7"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:0593822dcdb9483d41f12041ff7c90d4d1033ec0e880bcfaf102919b715f47f1"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:47f6eb74e1ecb5e19a78f4a4228aa24df7fbab3b62d4a625d3f41194a08bd54f"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c8b04a3dbd54de6ccb7604242fe3ad67f2f3ca558f2d33fe19d4b08d90701a89"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-win32.whl", hash = "sha256:8a78dfb198a328bfb38e4308ca8167028920fb747ddcf086ce706fbdd23b2926"},
|
||||
{file = "aiohttp-3.9.4-cp310-cp310-win_amd64.whl", hash = "sha256:e78da6b55275987cbc89141a1d8e75f5070e577c482dd48bd9123a76a96f0bbb"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c111b3c69060d2bafc446917534150fd049e7aedd6cbf21ba526a5a97b4402a5"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:efbdd51872cf170093998c87ccdf3cb5993add3559341a8e5708bcb311934c94"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7bfdb41dc6e85d8535b00d73947548a748e9534e8e4fddd2638109ff3fb081df"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2bd9d334412961125e9f68d5b73c1d0ab9ea3f74a58a475e6b119f5293eee7ba"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:35d78076736f4a668d57ade00c65d30a8ce28719d8a42471b2a06ccd1a2e3063"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:824dff4f9f4d0f59d0fa3577932ee9a20e09edec8a2f813e1d6b9f89ced8293f"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:52b8b4e06fc15519019e128abedaeb56412b106ab88b3c452188ca47a25c4093"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eae569fb1e7559d4f3919965617bb39f9e753967fae55ce13454bec2d1c54f09"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:69b97aa5792428f321f72aeb2f118e56893371f27e0b7d05750bcad06fc42ca1"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:4d79aad0ad4b980663316f26d9a492e8fab2af77c69c0f33780a56843ad2f89e"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:d6577140cd7db19e430661e4b2653680194ea8c22c994bc65b7a19d8ec834403"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:9860d455847cd98eb67897f5957b7cd69fbcb436dd3f06099230f16a66e66f79"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:69ff36d3f8f5652994e08bd22f093e11cfd0444cea310f92e01b45a4e46b624e"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-win32.whl", hash = "sha256:e27d3b5ed2c2013bce66ad67ee57cbf614288bda8cdf426c8d8fe548316f1b5f"},
|
||||
{file = "aiohttp-3.9.4-cp311-cp311-win_amd64.whl", hash = "sha256:d6a67e26daa686a6fbdb600a9af8619c80a332556245fa8e86c747d226ab1a1e"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:c5ff8ff44825736a4065d8544b43b43ee4c6dd1530f3a08e6c0578a813b0aa35"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d12a244627eba4e9dc52cbf924edef905ddd6cafc6513849b4876076a6f38b0e"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:dcad56c8d8348e7e468899d2fb3b309b9bc59d94e6db08710555f7436156097f"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4f7e69a7fd4b5ce419238388e55abd220336bd32212c673ceabc57ccf3d05b55"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c4870cb049f10d7680c239b55428916d84158798eb8f353e74fa2c98980dcc0b"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3b2feaf1b7031ede1bc0880cec4b0776fd347259a723d625357bb4b82f62687b"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:939393e8c3f0a5bcd33ef7ace67680c318dc2ae406f15e381c0054dd658397de"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7d2334e387b2adcc944680bebcf412743f2caf4eeebd550f67249c1c3696be04"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:e0198ea897680e480845ec0ffc5a14e8b694e25b3f104f63676d55bf76a82f1a"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:e40d2cd22914d67c84824045861a5bb0fb46586b15dfe4f046c7495bf08306b2"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:aba80e77c227f4234aa34a5ff2b6ff30c5d6a827a91d22ff6b999de9175d71bd"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:fb68dc73bc8ac322d2e392a59a9e396c4f35cb6fdbdd749e139d1d6c985f2527"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:f3460a92638dce7e47062cf088d6e7663adb135e936cb117be88d5e6c48c9d53"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-win32.whl", hash = "sha256:32dc814ddbb254f6170bca198fe307920f6c1308a5492f049f7f63554b88ef36"},
|
||||
{file = "aiohttp-3.9.4-cp312-cp312-win_amd64.whl", hash = "sha256:63f41a909d182d2b78fe3abef557fcc14da50c7852f70ae3be60e83ff64edba5"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:c3770365675f6be220032f6609a8fbad994d6dcf3ef7dbcf295c7ee70884c9af"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:305edae1dea368ce09bcb858cf5a63a064f3bff4767dec6fa60a0cc0e805a1d3"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6f121900131d116e4a93b55ab0d12ad72573f967b100e49086e496a9b24523ea"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b71e614c1ae35c3d62a293b19eface83d5e4d194e3eb2fabb10059d33e6e8cbf"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:419f009fa4cfde4d16a7fc070d64f36d70a8d35a90d71aa27670bba2be4fd039"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7b39476ee69cfe64061fd77a73bf692c40021f8547cda617a3466530ef63f947"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b33f34c9c7decdb2ab99c74be6443942b730b56d9c5ee48fb7df2c86492f293c"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c78700130ce2dcebb1a8103202ae795be2fa8c9351d0dd22338fe3dac74847d9"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:268ba22d917655d1259af2d5659072b7dc11b4e1dc2cb9662fdd867d75afc6a4"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:17e7c051f53a0d2ebf33013a9cbf020bb4e098c4bc5bce6f7b0c962108d97eab"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:7be99f4abb008cb38e144f85f515598f4c2c8932bf11b65add0ff59c9c876d99"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:d58a54d6ff08d2547656356eea8572b224e6f9bbc0cf55fa9966bcaac4ddfb10"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:7673a76772bda15d0d10d1aa881b7911d0580c980dbd16e59d7ba1422b2d83cd"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-win32.whl", hash = "sha256:e4370dda04dc8951012f30e1ce7956a0a226ac0714a7b6c389fb2f43f22a250e"},
|
||||
{file = "aiohttp-3.9.4-cp38-cp38-win_amd64.whl", hash = "sha256:eb30c4510a691bb87081192a394fb661860e75ca3896c01c6d186febe7c88530"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:84e90494db7df3be5e056f91412f9fa9e611fbe8ce4aaef70647297f5943b276"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:7d4845f8501ab28ebfdbeab980a50a273b415cf69e96e4e674d43d86a464df9d"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:69046cd9a2a17245c4ce3c1f1a4ff8c70c7701ef222fce3d1d8435f09042bba1"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8b73a06bafc8dcc508420db43b4dd5850e41e69de99009d0351c4f3007960019"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:418bb0038dfafeac923823c2e63226179976c76f981a2aaad0ad5d51f2229bca"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:71a8f241456b6c2668374d5d28398f8e8cdae4cce568aaea54e0f39359cd928d"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:935c369bf8acc2dc26f6eeb5222768aa7c62917c3554f7215f2ead7386b33748"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:74e4e48c8752d14ecfb36d2ebb3d76d614320570e14de0a3aa7a726ff150a03c"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:916b0417aeddf2c8c61291238ce25286f391a6acb6f28005dd9ce282bd6311b6"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:9b6787b6d0b3518b2ee4cbeadd24a507756ee703adbac1ab6dc7c4434b8c572a"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:221204dbda5ef350e8db6287937621cf75e85778b296c9c52260b522231940ed"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:10afd99b8251022ddf81eaed1d90f5a988e349ee7d779eb429fb07b670751e8c"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2506d9f7a9b91033201be9ffe7d89c6a54150b0578803cce5cb84a943d075bc3"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-win32.whl", hash = "sha256:e571fdd9efd65e86c6af2f332e0e95dad259bfe6beb5d15b3c3eca3a6eb5d87b"},
|
||||
{file = "aiohttp-3.9.4-cp39-cp39-win_amd64.whl", hash = "sha256:7d29dd5319d20aa3b7749719ac9685fbd926f71ac8c77b2477272725f882072d"},
|
||||
{file = "aiohttp-3.9.4.tar.gz", hash = "sha256:6ff71ede6d9a5a58cfb7b6fffc83ab5d4a63138276c771ac91ceaaddf5459644"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -2900,4 +2900,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "df7161da4fdc3cba0a445176fc9dda2a0e8a53e13a7aa8a864385ca259381b41"
|
||||
content-hash = "b3452b50901123fd5f2c385ce8a0c1c492296393b8a7926a322b6df0ea3ac572"
|
||||
|
||||
@@ -21,6 +21,7 @@ base64.workspace = true
|
||||
bstr.workspace = true
|
||||
bytes = { workspace = true, features = ["serde"] }
|
||||
camino.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
consumption_metrics.workspace = true
|
||||
@@ -78,7 +79,7 @@ subtle.workspace = true
|
||||
sync_wrapper.workspace = true
|
||||
task-local-extensions.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemallocator = { workspace = true, features = ["profiling"] }
|
||||
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
@@ -102,7 +103,6 @@ redis.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
camino-tempfile.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
rcgen.workspace = true
|
||||
rstest.workspace = true
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
use anyhow::{anyhow, bail};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
|
||||
use measured::{text::BufferedTextEncoder, MetricGroup};
|
||||
use metrics::NeonMetrics;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
ffi::CString,
|
||||
net::TcpListener,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tracing::{info, info_span};
|
||||
use tracing::{info, info_span, warn};
|
||||
use utils::http::{
|
||||
endpoint::{self, request_span},
|
||||
error::ApiError,
|
||||
@@ -21,18 +25,49 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::OK, "")
|
||||
}
|
||||
|
||||
async fn prof_dump(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
static PROF_MIB: Lazy<jemalloc::dump_mib> =
|
||||
Lazy::new(|| jemalloc::dump::mib().expect("could not create prof.dump MIB"));
|
||||
static PROF_DIR: Lazy<Utf8TempDir> =
|
||||
Lazy::new(|| camino_tempfile::tempdir().expect("could not create tempdir"));
|
||||
static PROF_FILE: Lazy<Utf8PathBuf> = Lazy::new(|| PROF_DIR.path().join("prof.dump"));
|
||||
static PROF_FILE0: Lazy<CString> = Lazy::new(|| CString::new(PROF_FILE.as_str()).unwrap());
|
||||
static DUMP_LOCK: Mutex<()> = Mutex::new(());
|
||||
|
||||
tokio::task::spawn_blocking(|| {
|
||||
let _guard = DUMP_LOCK.lock();
|
||||
PROF_MIB
|
||||
.write(&PROF_FILE0)
|
||||
.expect("could not trigger prof.dump");
|
||||
let prof_dump = std::fs::read_to_string(&*PROF_FILE).expect("could not open prof.dump");
|
||||
|
||||
Response::new(Body::from(prof_dump))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
}
|
||||
|
||||
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let state = Arc::new(Mutex::new(PrometheusHandler {
|
||||
encoder: BufferedTextEncoder::new(),
|
||||
metrics,
|
||||
}));
|
||||
|
||||
endpoint::make_router()
|
||||
let mut router = endpoint::make_router()
|
||||
.get("/metrics", move |r| {
|
||||
let state = state.clone();
|
||||
request_span(r, move |b| prometheus_metrics_handler(b, state))
|
||||
})
|
||||
.get("/v1/status", status_handler)
|
||||
.get("/v1/status", status_handler);
|
||||
|
||||
let prof_enabled = jemalloc::prof::read().unwrap_or_default();
|
||||
if prof_enabled {
|
||||
warn!("activating jemalloc profiling");
|
||||
jemalloc::active::write(true).unwrap();
|
||||
router = router.get("/v1/jemalloc/prof.dump", prof_dump);
|
||||
}
|
||||
|
||||
router
|
||||
}
|
||||
|
||||
pub async fn task_main(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::{ffi::CStr, marker::PhantomData};
|
||||
|
||||
use measured::{
|
||||
label::NoLabels,
|
||||
@@ -9,7 +9,9 @@ use measured::{
|
||||
text::TextEncoder,
|
||||
LabelGroup, MetricGroup,
|
||||
};
|
||||
use tikv_jemalloc_ctl::{config, epoch, epoch_mib, stats, version};
|
||||
use tikv_jemalloc_ctl::{
|
||||
config, epoch, epoch_mib, raw, stats, version, Access, AsName, MibStr, Name,
|
||||
};
|
||||
|
||||
pub struct MetricRecorder {
|
||||
epoch: epoch_mib,
|
||||
@@ -114,3 +116,59 @@ jemalloc_gauge!(mapped, mapped_mib);
|
||||
jemalloc_gauge!(metadata, metadata_mib);
|
||||
jemalloc_gauge!(resident, resident_mib);
|
||||
jemalloc_gauge!(retained, retained_mib);
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct dump;
|
||||
|
||||
impl dump {
|
||||
pub fn mib() -> tikv_jemalloc_ctl::Result<dump_mib> {
|
||||
Ok(dump_mib(b"prof.dump\0".as_slice().name().mib_str()?))
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(transparent)]
|
||||
#[derive(Copy, Clone)]
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct dump_mib(pub MibStr<[usize; 2]>);
|
||||
|
||||
impl dump_mib {
|
||||
pub fn write(self, value: &'static CStr) -> tikv_jemalloc_ctl::Result<()> {
|
||||
// No support for Access<CStr> yet.
|
||||
// self.0.write(value)
|
||||
let mib = [self.0[0], self.0[1]];
|
||||
raw::write_str_mib(&mib, value.to_bytes_with_nul())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct active;
|
||||
|
||||
impl active {
|
||||
pub fn name() -> &'static Name {
|
||||
b"prof.active\0".as_slice().name()
|
||||
}
|
||||
}
|
||||
|
||||
impl active {
|
||||
pub fn read() -> tikv_jemalloc_ctl::Result<bool> {
|
||||
Self::name().read()
|
||||
}
|
||||
pub fn write(value: bool) -> tikv_jemalloc_ctl::Result<()> {
|
||||
Self::name().write(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct prof;
|
||||
|
||||
impl prof {
|
||||
pub fn name() -> &'static Name {
|
||||
b"opt.prof\0".as_slice().name()
|
||||
}
|
||||
}
|
||||
|
||||
impl prof {
|
||||
pub fn read() -> tikv_jemalloc_ctl::Result<bool> {
|
||||
Self::name().read()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ psutil = "^5.9.4"
|
||||
types-psutil = "^5.9.5.12"
|
||||
types-toml = "^0.10.8.6"
|
||||
pytest-httpserver = "^1.0.8"
|
||||
aiohttp = "3.9.2"
|
||||
aiohttp = "3.9.4"
|
||||
pytest-rerunfailures = "^13.0"
|
||||
types-pytest-lazy-fixture = "^0.6.3.3"
|
||||
pytest-split = "^0.8.1"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
@@ -582,6 +583,91 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
|
||||
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Slow test that runs in realtime, checks that the background scheduling of secondary
|
||||
downloads happens as expected.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
# Create this many tenants, each with two timelines
|
||||
tenant_count = 4
|
||||
tenant_timelines = {}
|
||||
|
||||
# This mirrors a constant in `downloader.rs`
|
||||
freshen_interval_secs = 60
|
||||
|
||||
for _i in range(0, tenant_count):
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_a = TimelineId.generate()
|
||||
timeline_b = TimelineId.generate()
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id,
|
||||
timeline_a,
|
||||
placement_policy='{"Attached":1}',
|
||||
# Run with a low heatmap period so that we can avoid having to do synthetic API calls
|
||||
# to trigger the upload promptly.
|
||||
conf={"heatmap_period": "1s"},
|
||||
)
|
||||
env.neon_cli.create_timeline("main2", tenant_id, timeline_b)
|
||||
|
||||
tenant_timelines[tenant_id] = [timeline_a, timeline_b]
|
||||
|
||||
t_start = time.time()
|
||||
|
||||
# Wait long enough that the background downloads should happen; we expect all the inital layers
|
||||
# of all the initial timelines to show up on the secondary location of each tenant.
|
||||
time.sleep(freshen_interval_secs * 1.5)
|
||||
|
||||
for tenant_id, timelines in tenant_timelines.items():
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
ps_attached = env.get_pageserver(attached_to_id)
|
||||
# We only have two: the other one must be secondary
|
||||
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
|
||||
|
||||
for timeline_id in timelines:
|
||||
log.info(f"Checking for secondary timeline {timeline_id} on node {ps_secondary.id}")
|
||||
# One or more layers should be present for all timelines
|
||||
assert list_layers(ps_secondary, tenant_id, timeline_id)
|
||||
|
||||
# Delete the second timeline: this should be reflected later on the secondary
|
||||
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timelines[1])
|
||||
|
||||
# Wait long enough for the secondary locations to see the deletion
|
||||
time.sleep(freshen_interval_secs * 1.5)
|
||||
|
||||
for tenant_id, timelines in tenant_timelines.items():
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
ps_attached = env.get_pageserver(attached_to_id)
|
||||
# We only have two: the other one must be secondary
|
||||
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
|
||||
|
||||
# This one was not deleted
|
||||
assert list_layers(ps_secondary, tenant_id, timelines[0])
|
||||
|
||||
# This one was deleted
|
||||
assert not list_layers(ps_secondary, tenant_id, timelines[1])
|
||||
|
||||
t_end = time.time()
|
||||
|
||||
# Measure how many heatmap downloads we did in total: this checks that we succeeded with
|
||||
# proper scheduling, and not some bug that just runs downloads in a loop.
|
||||
total_heatmap_downloads = 0
|
||||
for ps in env.pageservers:
|
||||
v = ps.http_client().get_metric_value("pageserver_secondary_download_heatmap_total")
|
||||
assert v is not None
|
||||
total_heatmap_downloads += int(v)
|
||||
|
||||
download_rate = (total_heatmap_downloads / tenant_count) / (t_end - t_start)
|
||||
|
||||
expect_download_rate = 1.0 / freshen_interval_secs
|
||||
log.info(f"Download rate: {download_rate * 60}/min vs expected {expect_download_rate * 60}/min")
|
||||
|
||||
assert download_rate < expect_download_rate * 2
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
|
||||
@pytest.mark.parametrize("via_controller", [True, False])
|
||||
def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controller: bool):
|
||||
|
||||
@@ -273,7 +273,8 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
but imports the generation number.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
# One pageserver to simulate legacy environment, two to be managed by storage controller
|
||||
neon_env_builder.num_pageservers = 3
|
||||
|
||||
# Start services by hand so that we can skip registration on one of the pageservers
|
||||
env = neon_env_builder.init_configs()
|
||||
@@ -288,10 +289,10 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
)
|
||||
origin_ps = env.pageservers[0]
|
||||
|
||||
# This is the pageserver managed by the sharding service, where the tenant
|
||||
# These are the pageservers managed by the sharding service, where the tenant
|
||||
# will be attached after onboarding
|
||||
env.pageservers[1].start()
|
||||
dest_ps = env.pageservers[1]
|
||||
env.pageservers[2].start()
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
for sk in env.safekeepers:
|
||||
@@ -330,6 +331,9 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
)
|
||||
|
||||
virtual_ps_http.tenant_secondary_download(tenant_id)
|
||||
warm_up_ps = env.storage_controller.tenant_describe(tenant_id)["shards"][0][
|
||||
"node_secondary"
|
||||
][0]
|
||||
|
||||
# Call into storage controller to onboard the tenant
|
||||
generation += 1
|
||||
@@ -344,6 +348,18 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
)
|
||||
assert len(r["shards"]) == 1
|
||||
|
||||
describe = env.storage_controller.tenant_describe(tenant_id)["shards"][0]
|
||||
dest_ps_id = describe["node_attached"]
|
||||
dest_ps = env.get_pageserver(dest_ps_id)
|
||||
if warm_up:
|
||||
# The storage controller should have attached the tenant to the same placce
|
||||
# it had a secondary location, otherwise there was no point warming it up
|
||||
assert dest_ps_id == warm_up_ps
|
||||
|
||||
# It should have been given a new secondary location as well
|
||||
assert len(describe["node_secondary"]) == 1
|
||||
assert describe["node_secondary"][0] != warm_up_ps
|
||||
|
||||
# As if doing a live migration, detach the original pageserver
|
||||
origin_ps.http_client().tenant_location_conf(
|
||||
tenant_id,
|
||||
@@ -415,6 +431,9 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
dest_tenant_after_conf_change["generation"] == dest_tenant_before_conf_change["generation"]
|
||||
)
|
||||
dest_tenant_conf_after = dest_ps.http_client().tenant_config(tenant_id)
|
||||
|
||||
# Storage controller auto-sets heatmap period, ignore it for the comparison
|
||||
del dest_tenant_conf_after.tenant_specific_overrides["heatmap_period"]
|
||||
assert dest_tenant_conf_after.tenant_specific_overrides == modified_tenant_conf
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
Reference in New Issue
Block a user