Merge remote-tracking branch 'upstream/jcsp/tenant-snapshot' into jcsp/hack

This commit is contained in:
John Spray
2024-04-22 10:16:41 +01:00
35 changed files with 1828 additions and 221 deletions

1
Cargo.lock generated
View File

@@ -5084,6 +5084,7 @@ dependencies = [
"aws-smithy-async",
"bincode",
"bytes",
"camino",
"chrono",
"clap",
"crc32c",

View File

@@ -1,15 +1,15 @@
use std::{collections::HashMap, str::FromStr};
use std::{collections::HashMap, str::FromStr, time::Duration};
use clap::{Parser, Subcommand};
use hyper::Method;
use hyper::{Method, StatusCode};
use pageserver_api::{
controller_api::{
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
TenantDescribeResponse, TenantPolicyRequest,
},
models::{
ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
TenantShardSplitRequest, TenantShardSplitResponse,
LocationConfigSecondary, ShardParameters, TenantConfig, TenantConfigRequest,
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
},
shard::{ShardStripeSize, TenantShardId},
};
@@ -120,6 +120,12 @@ enum Command {
#[arg(long)]
tenant_id: TenantId,
},
/// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
/// mode so that it can warm up content on a pageserver.
TenantWarmup {
#[arg(long)]
tenant_id: TenantId,
},
}
#[derive(Parser)]
@@ -581,6 +587,94 @@ async fn main() -> anyhow::Result<()> {
}
println!("{table}");
}
Command::TenantWarmup { tenant_id } => {
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await;
match describe_response {
Ok(describe) => {
if matches!(describe.policy, PlacementPolicy::Secondary) {
// Fine: it's already known to controller in secondary mode: calling
// again to put it into secondary mode won't cause problems.
} else {
anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
}
}
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
// Fine: this tenant isn't know to the storage controller yet.
}
Err(e) => {
// Unexpected API error
return Err(e.into());
}
}
vps_client
.location_config(
TenantShardId::unsharded(tenant_id),
pageserver_api::models::LocationConfig {
mode: pageserver_api::models::LocationConfigMode::Secondary,
generation: None,
secondary_conf: Some(LocationConfigSecondary { warm: true }),
shard_number: 0,
shard_count: 0,
shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
tenant_conf: TenantConfig::default(),
},
None,
true,
)
.await?;
let describe_response = storcon_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await?;
let secondary_ps_id = describe_response
.shards
.first()
.unwrap()
.node_secondary
.first()
.unwrap();
println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
loop {
let (status, progress) = vps_client
.tenant_secondary_download(
TenantShardId::unsharded(tenant_id),
Some(Duration::from_secs(10)),
)
.await?;
println!(
"Progress: {}/{} layers, {}/{} bytes",
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
match status {
StatusCode::OK => {
println!("Download complete");
break;
}
StatusCode::ACCEPTED => {
// Loop
}
_ => {
anyhow::bail!("Unexpected download status: {status}");
}
}
}
}
}
Ok(())

150
docs/storage_controller.md Normal file
View File

@@ -0,0 +1,150 @@
# Storage Controller
## Concepts
The storage controller sits between administrative API clients and pageservers, and handles the details of mapping tenants to pageserver tenant shards. For example, creating a tenant is one API call to the storage controller,
which is mapped into many API calls to many pageservers (for multiple shards, and for secondary locations).
It implements a pageserver-compatible API that may be used for CRUD operations on tenants and timelines, translating these requests into appropriate operations on the shards within a tenant, which may be on many different pageservers. Using this API, the storage controller may be used in the same way as the pageserver's administrative HTTP API, hiding
the underlying details of how data is spread across multiple nodes.
The storage controller also manages generations, high availability (via secondary locations) and live migrations for tenants under its management. This is done with a reconciliation loop pattern, where tenants have an “intent” state and a “reconcile” task that tries to make the outside world match the intent.
## APIs
The storage controllers HTTP server implements four logically separate APIs:
- `/v1/...` path is the pageserver-compatible API. This has to be at the path root because thats where clients expect to find it on a pageserver.
- `/control/v1/...` path is the storage controllers API, which enables operations such as registering and management pageservers, or executing shard splits.
- `/debug/v1/...` path contains endpoints which are either exclusively used in tests, or are for use by engineers when supporting a deployed system.
- `/upcall/v1/...` path contains endpoints that are called by pageservers. This includes the `/re-attach` and `/validate` APIs used by pageservers
to ensure data safety with generation numbers.
The API is authenticated with a JWT token, and tokens must have scope `pageserverapi` (i.e. the same scope as pageservers APIs).
See the `http.rs` file in the source for where the HTTP APIs are implemented.
## Database
The storage controller uses a postgres database to persist a subset of its state. Note that the storage controller does _not_ keep all its state in the database: this is a design choice to enable most operations to be done efficiently in memory, rather than having to read from the database. See `persistence.rs` for a more comprehensive comment explaining what we do and do not persist: a useful metaphor is that we persist objects like tenants and nodes, but we do not
persist the _relationships_ between them: the attachment state of a tenant's shards to nodes is kept in memory and
rebuilt on startup.
The file `[persistence.rs](http://persistence.rs)` contains all the code for accessing the database, and has a large doc comment that goes into more detail about exactly what we persist and why.
The `diesel` crate is used for defining models & migrations.
Running a local cluster with `cargo neon` automatically starts a vanilla postgress process to host the storage controllers database.
### Diesel tip: migrations
If you need to modify the database schema, heres how to create a migration:
- Install the diesel CLI with `cargo install diesel_cli`
- Use `diesel migration generate <name>` to create a new migration
- Populate the SQL files in the `migrations/` subdirectory
- Use `DATABASE_URL=... diesel migration run` to apply the migration you just wrote: this will update the `[schema.rs](http://schema.rs)` file automatically.
- This requires a running database: the easiest way to do that is to just run `cargo neon init ; cargo neon start`, which will leave a database available at `postgresql://localhost:1235/attachment_service`
- Commit the migration files and the changes to schema.rs
- If you need to iterate, you can rewind migrations with `diesel migration revert -a` and then `diesel migration run` again.
- The migrations are build into the storage controller binary, and automatically run at startup after it is deployed, so once youve committed a migration no further steps are needed.
## storcon_cli
The `storcon_cli` tool enables interactive management of the storage controller. This is usually
only necessary for debug, but may also be used to manage nodes (e.g. marking a node as offline).
`storcon_cli --help` includes details on commands.
# Deploying
This section is aimed at engineers deploying the storage controller outside of Neon's cloud platform, as
part of a self-hosted system.
_General note: since the default `neon_local` environment includes a storage controller, this is a useful
reference when figuring out deployment._
## Database
It is **essential** that the database used by the storage controller is durable (**do not store it on ephemeral
local disk**). This database contains pageserver generation numbers, which are essential to data safety on the pageserver.
The resource requirements for the database are very low: a single CPU core and 1GiB of memory should work well for most deployments. The physical size of the database is typically under a gigabyte.
Set the URL to the database using the `--database-url` CLI option.
There is no need to run migrations manually: the storage controller automatically applies migrations
when it starts up.
## Configure pageservers to use the storage controller
1. The pageserver `control_plane_api` and `control_plane_api_token` should be set in the `pageserver.toml` file. The API setting should
point to the "upcall" prefix, for example `http://127.0.0.1:1234/upcall/v1/` is used in neon_local clusters.
2. Create a `metadata.json` file in the same directory as `pageserver.toml`: this enables the pageserver to automatically register itself
with the storage controller when it starts up. See the example below for the format of this file.
### Example `metadata.json`
```
{"host":"acmehost.localdomain","http_host":"acmehost.localdomain","http_port":9898,"port":64000}
```
- `port` and `host` refer to the _postgres_ port and host, and these must be accessible from wherever
postgres runs.
- `http_port` and `http_host` refer to the pageserver's HTTP api, this must be accessible from where
the storage controller runs.
## Handle compute notifications.
The storage controller independently moves tenant attachments between pageservers in response to
changes such as a pageserver node becoming unavailable, or the tenant's shard count changing. To enable
postgres clients to handle such changes, the storage controller calls an API hook when a tenant's pageserver
location changes.
The hook is configured using the storage controller's `--compute-hook-url` CLI option. If the hook requires
JWT auth, the token may be provided with `--control-plane-jwt-token`. The hook will be invoked with a `PUT` request.
In the Neon cloud service, this hook is implemented by Neon's internal cloud control plane. In `neon_local` systems
the storage controller integrates directly with neon_local to reconfigure local postgres processes instead of calling
the compute hook.
When implementing an on-premise Neon deployment, you must implement a service that handles the compute hook. This is not complicated:
the request body has format of the `ComputeHookNotifyRequest` structure, provided below for convenience.
```
struct ComputeHookNotifyRequestShard {
node_id: NodeId,
shard_number: ShardNumber,
}
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
stripe_size: Option<ShardStripeSize>,
shards: Vec<ComputeHookNotifyRequestShard>,
}
```
When a notification is received:
1. Modify postgres configuration for this tenant:
- set `neon.pageserver_connstr` to a comma-separated list of postgres connection strings to pageservers according to the `shards` list. The
shards identified by `NodeId` must be converted to the address+port of the node.
- if stripe_size is not None, set `neon.stripe_size` to this value
2. Send SIGHUP to postgres to reload configuration
3. Respond with 200 to the notification request. Do not return success if postgres was not updated: if an error is returned, the controller
will retry the notification until it succeeds..
### Example notification body
```
{
"tenant_id": "1f359dd625e519a1a4e8d7509690f6fc",
"stripe_size": 32768,
"shards": [
{"node_id": 344, "shard_number": 0},
{"node_id": 722, "shard_number": 1},
],
}
```

View File

@@ -192,6 +192,14 @@ impl<T> OnceCell<T> {
}
}
/// Like [`Guard::take_and_deinit`], but will return `None` if this OnceCell was never
/// initialized.
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
let inner = self.inner.get_mut().unwrap();
inner.take_and_deinit()
}
/// Return the number of [`Self::get_or_init`] calls waiting for initialization to complete.
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
@@ -246,15 +254,23 @@ impl<'a, T> Guard<'a, T> {
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(mut self) -> (T, InitPermit) {
self.0
.take_and_deinit()
.expect("guard is not created unless value has been initialized")
}
}
impl<T> Inner<T> {
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
let value = self.value.take()?;
let mut swapped = Inner::default();
let sem = swapped.init_semaphore.clone();
// acquire and forget right away, moving the control over to InitPermit
sem.try_acquire().expect("we just created this").forget();
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(sem)))
.expect("guard is not created unless value has been initialized")
let permit = InitPermit(sem);
std::mem::swap(self, &mut swapped);
Some((value, permit))
}
}
@@ -263,6 +279,13 @@ impl<'a, T> Guard<'a, T> {
/// On drop, this type will return the permit.
pub struct InitPermit(Arc<tokio::sync::Semaphore>);
impl std::fmt::Debug for InitPermit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ptr = Arc::as_ptr(&self.0) as *const ();
f.debug_tuple("InitPermit").field(&ptr).finish()
}
}
impl Drop for InitPermit {
fn drop(&mut self) {
assert_eq!(
@@ -559,4 +582,22 @@ mod tests {
assert_eq!(*target.get().unwrap(), 11);
}
#[tokio::test]
async fn take_and_deinit_on_mut() {
use std::convert::Infallible;
let mut target = OnceCell::<u32>::default();
assert!(target.take_and_deinit().is_none());
target
.get_or_init(|permit| async move { Ok::<_, Infallible>((42, permit)) })
.await
.unwrap();
let again = target.take_and_deinit();
assert!(matches!(again, Some((42, _))), "{again:?}");
assert!(target.take_and_deinit().is_none());
}
}

View File

@@ -1518,7 +1518,8 @@ pub(crate) struct SecondaryModeMetrics {
pub(crate) download_heatmap: IntCounter,
pub(crate) download_layer: IntCounter,
}
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| SecondaryModeMetrics {
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
SecondaryModeMetrics {
upload_heatmap: register_int_counter!(
"pageserver_secondary_upload_heatmap",
"Number of heatmaps written to remote storage by attached tenants"
@@ -1536,7 +1537,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
.expect("failed to define a metric"),
download_heatmap: register_int_counter!(
"pageserver_secondary_download_heatmap",
"Number of downloads of heatmaps by secondary mode locations"
"Number of downloads of heatmaps by secondary mode locations, including when it hasn't changed"
)
.expect("failed to define a metric"),
download_layer: register_int_counter!(
@@ -1544,6 +1545,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
"Number of downloads of layers by secondary mode locations"
)
.expect("failed to define a metric"),
}
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]

View File

@@ -33,6 +33,52 @@ impl Value {
}
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub(crate) enum InvalidInput {
TooShortValue,
TooShortPostgresRecord,
}
/// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
/// use this type for querying if a slice looks some particular way.
#[cfg(test)]
pub(crate) struct ValueBytes;
#[cfg(test)]
impl ValueBytes {
pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
if value_discriminator != [0, 0, 0, 1] {
// not a Value::WalRecord(..)
return Ok(false);
}
let walrecord_discriminator = &raw[4..8];
if walrecord_discriminator != [0, 0, 0, 0] {
// only NeonWalRecord::Postgres can have will_init
return Ok(false);
}
if raw.len() < 17 {
return Err(InvalidInput::TooShortPostgresRecord);
}
Ok(raw[8] == 1)
}
}
#[cfg(test)]
mod test {
use super::*;
@@ -70,6 +116,8 @@ mod test {
];
roundtrip!(image, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
}
#[test]
@@ -93,6 +141,96 @@ mod test {
];
roundtrip!(rec, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
}
#[test]
fn bytes_inspection_too_short_image() {
let rec = Value::Image(Bytes::from_static(b""));
#[rustfmt::skip]
let expected = [
// top level discriminator of 4 bytes
0x00, 0x00, 0x00, 0x00,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
roundtrip!(rec, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
assert_eq!(expected.len(), 12);
for len in 0..12 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortValue
);
}
}
#[test]
fn bytes_inspection_too_short_postgres_record() {
let rec = NeonWalRecord::Postgres {
will_init: false,
rec: Bytes::from_static(b""),
};
let rec = Value::WalRecord(rec);
#[rustfmt::skip]
let expected = [
// flattened discriminator of total 8 bytes
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
// will_init
0x00,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
roundtrip!(rec, expected);
assert!(!ValueBytes::will_init(&expected).unwrap());
assert_eq!(expected.len(), 17);
for len in 12..17 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortPostgresRecord
)
}
for len in 0..12 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortValue
)
}
}
#[test]
fn clear_visibility_map_flags_example() {
let rec = NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: Some(0x11),
old_heap_blkno: None,
flags: 0x03,
};
let rec = Value::WalRecord(rec);
#[rustfmt::skip]
let expected = [
// discriminators
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01,
// Some == 1 followed by 4 bytes
0x01, 0x00, 0x00, 0x00, 0x11,
// None == 0
0x00,
// flags
0x03
];
roundtrip!(rec, expected);
assert!(!ValueBytes::will_init(&expected).unwrap());
}
}

View File

@@ -3852,6 +3852,8 @@ pub(crate) mod harness {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::repository::{Key, Value};
@@ -3862,7 +3864,7 @@ mod tests {
use hex_literal::hex;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tests::timeline::ShutdownMode;
use tests::timeline::{GetVectoredError, ShutdownMode};
static TEST_KEY: Lazy<Key> =
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
@@ -4798,6 +4800,166 @@ mod tests {
Ok(())
}
// Test that vectored get descends into ancestor timelines correctly and
// does not return an image that's newer than requested.
//
// The diagram below ilustrates an interesting case. We have a parent timeline
// (top of the Lsn range) and a child timeline. The request key cannot be reconstructed
// from the child timeline, so the parent timeline must be visited. When advacing into
// the child timeline, the read path needs to remember what the requested Lsn was in
// order to avoid returning an image that's too new. The test below constructs such
// a timeline setup and does a few queries around the Lsn of each page image.
// ```
// LSN
// ^
// |
// |
// 500 | --------------------------------------> branch point
// 400 | X
// 300 | X
// 200 | --------------------------------------> requested lsn
// 100 | X
// |---------------------------------------> Key
// |
// ------> requested key
//
// Legend:
// * X - page images
// ```
#[tokio::test]
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis")?;
let (tenant, ctx) = harness.load().await;
let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_key = start_key.add(1000);
let child_gap_at_key = start_key.add(500);
let mut parent_gap_lsns: BTreeMap<Lsn, String> = BTreeMap::new();
let mut current_lsn = Lsn(0x10);
let timeline_id = TimelineId::generate();
let parent_timeline = tenant
.create_test_timeline(timeline_id, current_lsn, DEFAULT_PG_VERSION, &ctx)
.await?;
current_lsn += 0x100;
for _ in 0..3 {
let mut key = start_key;
while key < end_key {
current_lsn += 0x10;
let image_value = format!("{} at {}", child_gap_at_key, current_lsn);
let mut writer = parent_timeline.writer().await;
writer
.put(
key,
current_lsn,
&Value::Image(test_img(&image_value)),
&ctx,
)
.await?;
writer.finish_write(current_lsn);
if key == child_gap_at_key {
parent_gap_lsns.insert(current_lsn, image_value);
}
key = key.next();
}
parent_timeline.freeze_and_flush().await?;
}
let child_timeline_id = TimelineId::generate();
let child_timeline = tenant
.branch_timeline_test(&parent_timeline, child_timeline_id, Some(current_lsn), &ctx)
.await?;
let mut key = start_key;
while key < end_key {
if key == child_gap_at_key {
key = key.next();
continue;
}
current_lsn += 0x10;
let mut writer = child_timeline.writer().await;
writer
.put(
key,
current_lsn,
&Value::Image(test_img(&format!("{} at {}", key, current_lsn))),
&ctx,
)
.await?;
writer.finish_write(current_lsn);
key = key.next();
}
child_timeline.freeze_and_flush().await?;
let lsn_offsets: [i64; 5] = [-10, -1, 0, 1, 10];
let mut query_lsns = Vec::new();
for image_lsn in parent_gap_lsns.keys().rev() {
for offset in lsn_offsets {
query_lsns.push(Lsn(image_lsn
.0
.checked_add_signed(offset)
.expect("Shouldn't overflow")));
}
}
for query_lsn in query_lsns {
let results = child_timeline
.get_vectored_impl(
KeySpace {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
&ctx,
)
.await;
let expected_item = parent_gap_lsns
.iter()
.rev()
.find(|(lsn, _)| **lsn <= query_lsn);
info!(
"Doing vectored read at LSN {}. Expecting image to be: {:?}",
query_lsn, expected_item
);
match expected_item {
Some((_, img_value)) => {
let key_results = results.expect("No vectored get error expected");
let key_result = &key_results[&child_gap_at_key];
let returned_img = key_result
.as_ref()
.expect("No page reconstruct error expected");
info!(
"Vectored read at LSN {} returned image {}",
query_lsn,
std::str::from_utf8(returned_img)?
);
assert_eq!(*returned_img, test_img(img_value));
}
None => {
assert!(matches!(results, Err(GetVectoredError::MissingKey(_))));
}
}
}
Ok(())
}
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_random_updates")?;

View File

@@ -678,12 +678,19 @@ pub async fn init_tenant_mgr(
}
}
}
LocationMode::Secondary(secondary_conf) => TenantSlot::Secondary(SecondaryTenant::new(
tenant_shard_id,
shard_identity,
location_conf.tenant_conf,
&secondary_conf,
)),
LocationMode::Secondary(secondary_conf) => {
info!(
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
"Starting secondary tenant"
);
TenantSlot::Secondary(SecondaryTenant::new(
tenant_shard_id,
shard_identity,
location_conf.tenant_conf,
&secondary_conf,
))
}
};
tenants.insert(tenant_shard_id, slot);

View File

@@ -312,7 +312,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
(detail.last_download, detail.next_download.unwrap())
};
if now < next_download {
if now > next_download {
Some(PendingDownload {
secondary_state: secondary_tenant,
last_download,
@@ -647,6 +647,12 @@ impl<'a> TenantDownloader<'a> {
progress.bytes_downloaded += layer_byte_count;
progress.layers_downloaded += layer_count;
}
for delete_timeline in &delete_timelines {
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
// from disk fails that will be a fatal error.
detail.timelines.remove(delete_timeline);
}
}
// Execute accumulated deletions
@@ -710,13 +716,14 @@ impl<'a> TenantDownloader<'a> {
.await
.map_err(UpdateError::from)?;
SECONDARY_MODE.download_heatmap.inc();
if Some(&download.etag) == prev_etag {
Ok(HeatMapDownload::Unmodified)
} else {
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
SECONDARY_MODE.download_heatmap.inc();
Ok(HeatMapDownload::Modified(HeatMapModified {
etag: download.etag,
last_modified: download.last_modified,

View File

@@ -20,8 +20,8 @@
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
//! ```
//!
//! Every delta file consists of three parts: "summary", "index", and
//! "values". The summary is a fixed size header at the beginning of the file,
//! Every delta file consists of three parts: "summary", "values", and
//! "index". The summary is a fixed size header at the beginning of the file,
//! and it contains basic information about the layer, and offsets to the other
//! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
//! "values" part. The actual page images and WAL records are stored in the
@@ -863,7 +863,7 @@ impl DeltaLayerInner {
.into(),
);
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let data_end_offset = self.index_start_offset();
let reads = Self::plan_reads(
keyspace,
@@ -1103,11 +1103,195 @@ impl DeltaLayerInner {
if let Some(last) = all_keys.last_mut() {
// Last key occupies all space till end of value storage,
// which corresponds to beginning of the index
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
last.size = self.index_start_offset() - last.size;
}
Ok(all_keys)
}
/// Using the given writer, write out a truncated version, where LSNs higher than the
/// truncate_at are missing.
#[cfg(test)]
pub(super) async fn copy_prefix(
&self,
writer: &mut DeltaLayerWriter,
truncate_at: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
};
use futures::stream::TryStreamExt;
#[derive(Debug)]
enum Item {
Actual(Key, Lsn, BlobRef),
Sentinel,
}
impl From<Item> for Option<(Key, Lsn, BlobRef)> {
fn from(value: Item) -> Self {
match value {
Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
Item::Sentinel => None,
}
}
}
impl Item {
fn offset(&self) -> Option<BlobRef> {
match self {
Item::Actual(_, _, blob) => Some(*blob),
Item::Sentinel => None,
}
}
fn is_last(&self) -> bool {
matches!(self, Item::Sentinel)
}
}
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
// put in a sentinel value for getting the end offset for last item, and not having to
// repeat the whole read part
let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
Item::Sentinel,
))));
let mut stream = std::pin::pin!(stream);
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<VectoredReadBuilder> = None;
let max_read_size = self
.max_vectored_read_bytes
.map(|x| x.0.get())
.unwrap_or(8192);
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
while let Some(item) = stream.try_next().await? {
tracing::debug!(?item, "popped");
let offset = item
.offset()
.unwrap_or(BlobRef::new(self.index_start_offset(), false));
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset))
} else {
None
};
let is_last = item.is_last();
prev = Option::from(item);
let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
let builder = if let Some((meta, offsets)) = actionable {
// extend or create a new builder
if read_builder
.as_mut()
.map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
.unwrap_or(VectoredReadExtended::No)
== VectoredReadExtended::Yes
{
None
} else {
read_builder.replace(VectoredReadBuilder::new(
offsets.start.pos(),
offsets.end.pos(),
meta,
max_read_size,
))
}
} else {
// nothing to do, except perhaps flush any existing for the last element
None
};
// flush the possible older builder and also the new one if the item was the last one
let builders = builder.into_iter();
let builders = if is_last {
builders.chain(read_builder.take())
} else {
builders.chain(None)
};
for builder in builders {
let read = builder.build();
let reader = VectoredBlobReader::new(&self.file);
let mut buf = buffer.take().unwrap();
buf.clear();
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf).await?;
for blob in res.blobs {
let key = blob.meta.key;
let lsn = blob.meta.lsn;
let data = &res.buf[blob.start..blob.end];
#[cfg(debug_assertions)]
Value::des(data)
.with_context(|| {
format!(
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
blob.meta.key,
blob.meta.lsn,
blob.start,
blob.end,
utils::Hex(data)
)
})
.unwrap();
// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = crate::repository::ValueBytes::will_init(data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);
per_blob_copy.clear();
per_blob_copy.extend_from_slice(data);
let (tmp, res) = writer
.put_value_bytes(key, lsn, std::mem::take(&mut per_blob_copy), will_init)
.await;
per_blob_copy = tmp;
res?;
}
buffer = Some(res.buf);
}
}
assert!(
read_builder.is_none(),
"with the sentinel above loop should had handled all"
);
Ok(())
}
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
println!(
"index_start_blk: {}, root {}",
@@ -1177,6 +1361,44 @@ impl DeltaLayerInner {
Ok(())
}
#[cfg(test)]
fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
start: &'a [u8; DELTA_KEY_SIZE],
ctx: &'a RequestContext,
) -> impl futures::stream::Stream<
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
> + 'a
where
R: BlockReader,
{
use futures::stream::TryStreamExt;
let stream = reader.get_stream_from(start, ctx);
stream.map_ok(|(key, value)| {
let key = DeltaKey::from_slice(&key);
let (key, lsn) = (key.key(), key.lsn());
let offset = BlobRef(value);
(key, lsn, offset)
})
}
/// The file offset to the first block of index.
///
/// The file structure is summary, values, and index. We often need this for the size of last blob.
fn index_start_offset(&self) -> u64 {
let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let bref = BlobRef(offset);
tracing::debug!(
index_start_blk = self.index_start_blk,
offset,
pos = bref.pos(),
"index_start_offset"
);
offset
}
}
/// A set of data associated with a delta layer key and its value
@@ -1538,7 +1760,7 @@ mod test {
let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
let inner = resident.get_inner_delta(&ctx).await?;
let inner = resident.as_delta(&ctx).await?;
let file_size = inner.file.metadata().await?.len();
tracing::info!(
@@ -1594,4 +1816,217 @@ mod test {
Ok(())
}
#[tokio::test]
async fn copy_delta_prefix_smoke() {
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
let (tenant, ctx) = h.load().await;
let ctx = &ctx;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
.await
.unwrap();
let initdb_layer = timeline
.layers
.read()
.await
.likely_resident_layers()
.next()
.unwrap();
{
let mut writer = timeline.writer().await;
let data = [
(0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
(
0x30,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: false,
rec: Bytes::from_static(b"1"),
}),
),
(
0x40,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: Bytes::from_static(b"2"),
}),
),
// build an oversized value so we cannot extend and existing read over
// this
(
0x50,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: {
let mut buf =
vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
buf.iter_mut()
.enumerate()
.for_each(|(i, slot)| *slot = (i % 256) as u8);
Bytes::from(buf)
},
}),
),
// because the oversized read cannot be extended further, we are sure to exercise the
// builder created on the last round with this:
(
0x60,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: Bytes::from_static(b"3"),
}),
),
(
0x60,
9,
Value::Image(Bytes::from_static(b"something for a different key")),
),
];
let mut last_lsn = None;
for (lsn, key, value) in data {
let key = Key::from_i128(key);
writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
last_lsn = Some(lsn);
}
writer.finish_write(Lsn(last_lsn.unwrap()));
}
timeline.freeze_and_flush().await.unwrap();
let new_layer = timeline
.layers
.read()
.await
.likely_resident_layers()
.find(|x| x != &initdb_layer)
.unwrap();
// create a copy for the timeline, so we don't overwrite the file
let branch = tenant
.branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
.await
.unwrap();
assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
// truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
// a single key
for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
let truncate_at = Lsn(truncate_at);
let mut writer = DeltaLayerWriter::new(
tenant.conf,
branch.timeline_id,
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
)
.await
.unwrap();
let new_layer = new_layer.download_and_keep_resident().await.unwrap();
new_layer
.copy_delta_prefix(&mut writer, truncate_at, ctx)
.await
.unwrap();
let copied_layer = writer.finish(Key::MAX, &branch).await.unwrap();
copied_layer.as_delta(ctx).await.unwrap();
assert_keys_and_values_eq(
new_layer.as_delta(ctx).await.unwrap(),
copied_layer.as_delta(ctx).await.unwrap(),
truncate_at,
ctx,
)
.await;
}
}
async fn assert_keys_and_values_eq(
source: &DeltaLayerInner,
truncated: &DeltaLayerInner,
truncated_at: Lsn,
ctx: &RequestContext,
) {
use futures::future::ready;
use futures::stream::TryStreamExt;
let start_key = [0u8; DELTA_KEY_SIZE];
let source_reader = FileBlockReader::new(&source.file, source.file_id);
let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
source.index_start_blk,
source.index_root_blk,
&source_reader,
);
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
let source_stream = source_stream.filter(|res| match res {
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
_ => ready(true),
});
let mut source_stream = std::pin::pin!(source_stream);
let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
truncated.index_start_blk,
truncated.index_root_blk,
&truncated_reader,
);
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
let mut truncated_stream = std::pin::pin!(truncated_stream);
let mut scratch_left = Vec::new();
let mut scratch_right = Vec::new();
loop {
let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
if src.is_none() {
assert!(truncated.is_none());
break;
}
let (src, truncated) = (src.unwrap(), truncated.unwrap());
// because we've filtered the source with Lsn, we should always have the same keys from both.
assert_eq!(src.0, truncated.0);
assert_eq!(src.1, truncated.1);
// if this is needed for something else, just drop this assert.
assert!(
src.2.pos() >= truncated.2.pos(),
"value position should not go backwards {} vs. {}",
src.2.pos(),
truncated.2.pos()
);
scratch_left.clear();
let src_cursor = source_reader.block_cursor();
let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
scratch_right.clear();
let trunc_cursor = truncated_reader.block_cursor();
let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
tokio::try_join!(left, right).unwrap();
assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
}
}
}

View File

@@ -116,6 +116,12 @@ impl AsLayerDesc for Layer {
}
}
impl PartialEq for Layer {
fn eq(&self, other: &Self) -> bool {
Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
}
}
impl Layer {
/// Creates a layer value for a file we know to not be resident.
pub(crate) fn for_evicted(
@@ -604,9 +610,17 @@ enum Status {
impl Drop for LayerInner {
fn drop(&mut self) {
// if there was a pending eviction, mark it cancelled here to balance metrics
if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
{
// eviction has already been started
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
// eviction request is intentionally not honored as no one is present to wait for it
// and we could be delaying shutdown for nothing.
}
if !*self.wanted_deleted.get_mut() {
// should we try to evict if the last wish was for eviction? seems more like a hazard
// than a clear win.
return;
}
@@ -1552,8 +1566,8 @@ impl Drop for DownloadedLayer {
if let Some(owner) = self.owner.upgrade() {
owner.on_downloaded_layer_drop(self.version);
} else {
// no need to do anything, we are shutting down
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
// Layer::drop will handle cancelling the eviction; because of drop order and
// `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
}
}
}
@@ -1752,6 +1766,28 @@ impl ResidentLayer {
}
}
/// FIXME: truncate is bad name because we are not truncating anything, but copying the
/// filtered parts.
#[cfg(test)]
pub(super) async fn copy_delta_prefix(
&self,
writer: &mut super::delta_layer::DeltaLayerWriter,
truncate_at: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use LayerKind::*;
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => d
.copy_prefix(writer, truncate_at, ctx)
.await
.with_context(|| format!("truncate {self}")),
Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
}
}
pub(crate) fn local_path(&self) -> &Utf8Path {
&self.owner.0.path
}
@@ -1761,14 +1797,14 @@ impl ResidentLayer {
}
#[cfg(test)]
pub(crate) async fn get_inner_delta<'a>(
&'a self,
pub(crate) async fn as_delta(
&self,
ctx: &RequestContext,
) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
LayerKind::Delta(d) => Ok(d),
LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
use LayerKind::*;
match self.downloaded.get(&self.owner.0, ctx).await? {
Delta(ref d) => Ok(d),
Image(_) => Err(anyhow::anyhow!("image layer")),
}
}
}

View File

@@ -721,6 +721,103 @@ async fn evict_and_wait_does_not_wait_for_download() {
layer.evict_and_wait(FOREVER).await.unwrap();
}
/// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
/// which is the last value.
///
/// Also checks that the same does not happen on a non-evicted layer (regression test).
#[tokio::test(start_paused = true)]
async fn eviction_cancellation_on_drop() {
use crate::repository::Value;
use bytes::Bytes;
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let (tenant, ctx) = h.load().await;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();
{
// create_test_timeline wrote us one layer, write another
let mut writer = timeline.writer().await;
writer
.put(
Key::from_i128(5),
Lsn(0x20),
&Value::Image(Bytes::from_static(b"this does not matter either")),
&ctx,
)
.await
.unwrap();
writer.finish_write(Lsn(0x20));
}
timeline.freeze_and_flush().await.unwrap();
// wait for the upload to complete so our Arc::strong_count assertion holds
timeline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.unwrap();
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().collect::<Vec<_>>();
// remove the layers from layermap
guard.finish_gc_timeline(&layers);
layers
};
assert_eq!(layers.len(), 2);
(layers.pop().unwrap(), layers.pop().unwrap())
};
let victims = [(evicted_layer, true), (not_evicted, false)];
for (victim, evict) in victims {
let resident = victim.keep_resident().await.unwrap();
drop(victim);
assert_eq!(Arc::strong_count(&resident.owner.0), 1);
if evict {
let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
// drive the future to await on the status channel, and then drop it
tokio::time::timeout(ADVANCE, evict_and_wait)
.await
.expect_err("should had been a timeout since we are holding the layer resident");
}
// 1 == we only evict one of the layers
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
drop(resident);
// run any spawned
tokio::time::sleep(ADVANCE).await;
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
assert_eq!(
1,
LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
);
}
}
#[test]
fn layer_size() {
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);

View File

@@ -2968,7 +2968,8 @@ impl Timeline {
break;
}
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
// Take the min to avoid reconstructing a page with data newer than request Lsn.
cont_lsn = std::cmp::min(Lsn(request_lsn.0 + 1), Lsn(timeline.ancestor_lsn.0 + 1));
timeline_owned = timeline
.get_ready_ancestor_timeline(ctx)
.await

View File

@@ -61,18 +61,18 @@ pub struct VectoredRead {
}
impl VectoredRead {
pub fn size(&self) -> usize {
pub(crate) fn size(&self) -> usize {
(self.end - self.start) as usize
}
}
#[derive(Eq, PartialEq)]
enum VectoredReadExtended {
pub(crate) enum VectoredReadExtended {
Yes,
No,
}
struct VectoredReadBuilder {
pub(crate) struct VectoredReadBuilder {
start: u64,
end: u64,
blobs_at: VecMap<u64, BlobMeta>,
@@ -80,7 +80,17 @@ struct VectoredReadBuilder {
}
impl VectoredReadBuilder {
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
/// Start building a new vectored read.
///
/// Note that by design, this does not check against reading more than `max_read_size` to
/// support reading larger blobs than the configuration value. The builder will be single use
/// however after that.
pub(crate) fn new(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: usize,
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
@@ -97,7 +107,8 @@ impl VectoredReadBuilder {
/// Attempt to extend the current read with a new blob if the start
/// offset matches with the current end of the vectored read
/// and the resuting size is below the max read size
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
tracing::trace!(start, end, "trying to extend");
let size = (end - start) as usize;
if self.end == start && self.size() + size <= self.max_read_size {
self.end = end;
@@ -111,11 +122,11 @@ impl VectoredReadBuilder {
VectoredReadExtended::No
}
fn size(&self) -> usize {
pub(crate) fn size(&self) -> usize {
(self.end - self.start) as usize
}
fn build(self) -> VectoredRead {
pub(crate) fn build(self) -> VectoredRead {
VectoredRead {
start: self.start,
end: self.end,

View File

@@ -55,6 +55,7 @@ impl NeonWalRecord {
/// Does replaying this WAL record initialize the page from scratch, or does
/// it need to be applied over the previous image of the page?
pub fn will_init(&self) -> bool {
// If you change this function, you'll also need to change ValueBytes::will_init
match self {
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,

156
poetry.lock generated
View File

@@ -2,87 +2,87 @@
[[package]]
name = "aiohttp"
version = "3.9.2"
version = "3.9.4"
description = "Async http client/server framework (asyncio)"
optional = false
python-versions = ">=3.8"
files = [
{file = "aiohttp-3.9.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:772fbe371788e61c58d6d3d904268e48a594ba866804d08c995ad71b144f94cb"},
{file = "aiohttp-3.9.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:edd4f1af2253f227ae311ab3d403d0c506c9b4410c7fc8d9573dec6d9740369f"},
{file = "aiohttp-3.9.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:cfee9287778399fdef6f8a11c9e425e1cb13cc9920fd3a3df8f122500978292b"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3cc158466f6a980a6095ee55174d1de5730ad7dec251be655d9a6a9dd7ea1ff9"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:54ec82f45d57c9a65a1ead3953b51c704f9587440e6682f689da97f3e8defa35"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:abeb813a18eb387f0d835ef51f88568540ad0325807a77a6e501fed4610f864e"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cc91d07280d7d169f3a0f9179d8babd0ee05c79d4d891447629ff0d7d8089ec2"},
{file = "aiohttp-3.9.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b65e861f4bebfb660f7f0f40fa3eb9f2ab9af10647d05dac824390e7af8f75b7"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:04fd8ffd2be73d42bcf55fd78cde7958eeee6d4d8f73c3846b7cba491ecdb570"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:3d8d962b439a859b3ded9a1e111a4615357b01620a546bc601f25b0211f2da81"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:8ceb658afd12b27552597cf9a65d9807d58aef45adbb58616cdd5ad4c258c39e"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:0e4ee4df741670560b1bc393672035418bf9063718fee05e1796bf867e995fad"},
{file = "aiohttp-3.9.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:2dec87a556f300d3211decf018bfd263424f0690fcca00de94a837949fbcea02"},
{file = "aiohttp-3.9.2-cp310-cp310-win32.whl", hash = "sha256:3e1a800f988ce7c4917f34096f81585a73dbf65b5c39618b37926b1238cf9bc4"},
{file = "aiohttp-3.9.2-cp310-cp310-win_amd64.whl", hash = "sha256:ea510718a41b95c236c992b89fdfc3d04cc7ca60281f93aaada497c2b4e05c46"},
{file = "aiohttp-3.9.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:6aaa6f99256dd1b5756a50891a20f0d252bd7bdb0854c5d440edab4495c9f973"},
{file = "aiohttp-3.9.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a27d8c70ad87bcfce2e97488652075a9bdd5b70093f50b10ae051dfe5e6baf37"},
{file = "aiohttp-3.9.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:54287bcb74d21715ac8382e9de146d9442b5f133d9babb7e5d9e453faadd005e"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5bb3d05569aa83011fcb346b5266e00b04180105fcacc63743fc2e4a1862a891"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c8534e7d69bb8e8d134fe2be9890d1b863518582f30c9874ed7ed12e48abe3c4"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4bd9d5b989d57b41e4ff56ab250c5ddf259f32db17159cce630fd543376bd96b"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa6904088e6642609981f919ba775838ebf7df7fe64998b1a954fb411ffb4663"},
{file = "aiohttp-3.9.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bda42eb410be91b349fb4ee3a23a30ee301c391e503996a638d05659d76ea4c2"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:193cc1ccd69d819562cc7f345c815a6fc51d223b2ef22f23c1a0f67a88de9a72"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:b9f1cb839b621f84a5b006848e336cf1496688059d2408e617af33e3470ba204"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:d22a0931848b8c7a023c695fa2057c6aaac19085f257d48baa24455e67df97ec"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:4112d8ba61fbd0abd5d43a9cb312214565b446d926e282a6d7da3f5a5aa71d36"},
{file = "aiohttp-3.9.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:c4ad4241b52bb2eb7a4d2bde060d31c2b255b8c6597dd8deac2f039168d14fd7"},
{file = "aiohttp-3.9.2-cp311-cp311-win32.whl", hash = "sha256:ee2661a3f5b529f4fc8a8ffee9f736ae054adfb353a0d2f78218be90617194b3"},
{file = "aiohttp-3.9.2-cp311-cp311-win_amd64.whl", hash = "sha256:4deae2c165a5db1ed97df2868ef31ca3cc999988812e82386d22937d9d6fed52"},
{file = "aiohttp-3.9.2-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:6f4cdba12539215aaecf3c310ce9d067b0081a0795dd8a8805fdb67a65c0572a"},
{file = "aiohttp-3.9.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:84e843b33d5460a5c501c05539809ff3aee07436296ff9fbc4d327e32aa3a326"},
{file = "aiohttp-3.9.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8008d0f451d66140a5aa1c17e3eedc9d56e14207568cd42072c9d6b92bf19b52"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:61c47ab8ef629793c086378b1df93d18438612d3ed60dca76c3422f4fbafa792"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:bc71f748e12284312f140eaa6599a520389273174b42c345d13c7e07792f4f57"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a1c3a4d0ab2f75f22ec80bca62385db2e8810ee12efa8c9e92efea45c1849133"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9a87aa0b13bbee025faa59fa58861303c2b064b9855d4c0e45ec70182bbeba1b"},
{file = "aiohttp-3.9.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e2cc0d04688b9f4a7854c56c18aa7af9e5b0a87a28f934e2e596ba7e14783192"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:1956e3ac376b1711c1533266dec4efd485f821d84c13ce1217d53e42c9e65f08"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:114da29f39eccd71b93a0fcacff178749a5c3559009b4a4498c2c173a6d74dff"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:3f17999ae3927d8a9a823a1283b201344a0627272f92d4f3e3a4efe276972fe8"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:f31df6a32217a34ae2f813b152a6f348154f948c83213b690e59d9e84020925c"},
{file = "aiohttp-3.9.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:7a75307ffe31329928a8d47eae0692192327c599113d41b278d4c12b54e1bd11"},
{file = "aiohttp-3.9.2-cp312-cp312-win32.whl", hash = "sha256:972b63d589ff8f305463593050a31b5ce91638918da38139b9d8deaba9e0fed7"},
{file = "aiohttp-3.9.2-cp312-cp312-win_amd64.whl", hash = "sha256:200dc0246f0cb5405c80d18ac905c8350179c063ea1587580e3335bfc243ba6a"},
{file = "aiohttp-3.9.2-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:158564d0d1020e0d3fe919a81d97aadad35171e13e7b425b244ad4337fc6793a"},
{file = "aiohttp-3.9.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:da1346cd0ccb395f0ed16b113ebb626fa43b7b07fd7344fce33e7a4f04a8897a"},
{file = "aiohttp-3.9.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:eaa9256de26ea0334ffa25f1913ae15a51e35c529a1ed9af8e6286dd44312554"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1543e7fb00214fb4ccead42e6a7d86f3bb7c34751ec7c605cca7388e525fd0b4"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:186e94570433a004e05f31f632726ae0f2c9dee4762a9ce915769ce9c0a23d89"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d52d20832ac1560f4510d68e7ba8befbc801a2b77df12bd0cd2bcf3b049e52a4"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1c45e4e815ac6af3b72ca2bde9b608d2571737bb1e2d42299fc1ffdf60f6f9a1"},
{file = "aiohttp-3.9.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aa906b9bdfd4a7972dd0628dbbd6413d2062df5b431194486a78f0d2ae87bd55"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:68bbee9e17d66f17bb0010aa15a22c6eb28583edcc8b3212e2b8e3f77f3ebe2a"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:4c189b64bd6d9a403a1a3f86a3ab3acbc3dc41a68f73a268a4f683f89a4dec1f"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:8a7876f794523123bca6d44bfecd89c9fec9ec897a25f3dd202ee7fc5c6525b7"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:d23fba734e3dd7b1d679b9473129cd52e4ec0e65a4512b488981a56420e708db"},
{file = "aiohttp-3.9.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b141753be581fab842a25cb319f79536d19c2a51995d7d8b29ee290169868eab"},
{file = "aiohttp-3.9.2-cp38-cp38-win32.whl", hash = "sha256:103daf41ff3b53ba6fa09ad410793e2e76c9d0269151812e5aba4b9dd674a7e8"},
{file = "aiohttp-3.9.2-cp38-cp38-win_amd64.whl", hash = "sha256:328918a6c2835861ff7afa8c6d2c70c35fdaf996205d5932351bdd952f33fa2f"},
{file = "aiohttp-3.9.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5264d7327c9464786f74e4ec9342afbbb6ee70dfbb2ec9e3dfce7a54c8043aa3"},
{file = "aiohttp-3.9.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:07205ae0015e05c78b3288c1517afa000823a678a41594b3fdc870878d645305"},
{file = "aiohttp-3.9.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ae0a1e638cffc3ec4d4784b8b4fd1cf28968febc4bd2718ffa25b99b96a741bd"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d43302a30ba1166325974858e6ef31727a23bdd12db40e725bec0f759abce505"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:16a967685907003765855999af11a79b24e70b34dc710f77a38d21cd9fc4f5fe"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6fa3ee92cd441d5c2d07ca88d7a9cef50f7ec975f0117cd0c62018022a184308"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b500c5ad9c07639d48615a770f49618130e61be36608fc9bc2d9bae31732b8f"},
{file = "aiohttp-3.9.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c07327b368745b1ce2393ae9e1aafed7073d9199e1dcba14e035cc646c7941bf"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:cc7d6502c23a0ec109687bf31909b3fb7b196faf198f8cff68c81b49eb316ea9"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:07be2be7071723c3509ab5c08108d3a74f2181d4964e869f2504aaab68f8d3e8"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:122468f6fee5fcbe67cb07014a08c195b3d4c41ff71e7b5160a7bcc41d585a5f"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:00a9abcea793c81e7f8778ca195a1714a64f6d7436c4c0bb168ad2a212627000"},
{file = "aiohttp-3.9.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7a9825fdd64ecac5c670234d80bb52bdcaa4139d1f839165f548208b3779c6c6"},
{file = "aiohttp-3.9.2-cp39-cp39-win32.whl", hash = "sha256:5422cd9a4a00f24c7244e1b15aa9b87935c85fb6a00c8ac9b2527b38627a9211"},
{file = "aiohttp-3.9.2-cp39-cp39-win_amd64.whl", hash = "sha256:7d579dcd5d82a86a46f725458418458fa43686f6a7b252f2966d359033ffc8ab"},
{file = "aiohttp-3.9.2.tar.gz", hash = "sha256:b0ad0a5e86ce73f5368a164c10ada10504bf91869c05ab75d982c6048217fbf7"},
{file = "aiohttp-3.9.4-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:76d32588ef7e4a3f3adff1956a0ba96faabbdee58f2407c122dd45aa6e34f372"},
{file = "aiohttp-3.9.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:56181093c10dbc6ceb8a29dfeea1e815e1dfdc020169203d87fd8d37616f73f9"},
{file = "aiohttp-3.9.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c7a5b676d3c65e88b3aca41816bf72831898fcd73f0cbb2680e9d88e819d1e4d"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d1df528a85fb404899d4207a8d9934cfd6be626e30e5d3a5544a83dbae6d8a7e"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f595db1bceabd71c82e92df212dd9525a8a2c6947d39e3c994c4f27d2fe15b11"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9c0b09d76e5a4caac3d27752027fbd43dc987b95f3748fad2b924a03fe8632ad"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:689eb4356649ec9535b3686200b231876fb4cab4aca54e3bece71d37f50c1d13"},
{file = "aiohttp-3.9.4-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a3666cf4182efdb44d73602379a66f5fdfd5da0db5e4520f0ac0dcca644a3497"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b65b0f8747b013570eea2f75726046fa54fa8e0c5db60f3b98dd5d161052004a"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:a1885d2470955f70dfdd33a02e1749613c5a9c5ab855f6db38e0b9389453dce7"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:0593822dcdb9483d41f12041ff7c90d4d1033ec0e880bcfaf102919b715f47f1"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_s390x.whl", hash = "sha256:47f6eb74e1ecb5e19a78f4a4228aa24df7fbab3b62d4a625d3f41194a08bd54f"},
{file = "aiohttp-3.9.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:c8b04a3dbd54de6ccb7604242fe3ad67f2f3ca558f2d33fe19d4b08d90701a89"},
{file = "aiohttp-3.9.4-cp310-cp310-win32.whl", hash = "sha256:8a78dfb198a328bfb38e4308ca8167028920fb747ddcf086ce706fbdd23b2926"},
{file = "aiohttp-3.9.4-cp310-cp310-win_amd64.whl", hash = "sha256:e78da6b55275987cbc89141a1d8e75f5070e577c482dd48bd9123a76a96f0bbb"},
{file = "aiohttp-3.9.4-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c111b3c69060d2bafc446917534150fd049e7aedd6cbf21ba526a5a97b4402a5"},
{file = "aiohttp-3.9.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:efbdd51872cf170093998c87ccdf3cb5993add3559341a8e5708bcb311934c94"},
{file = "aiohttp-3.9.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7bfdb41dc6e85d8535b00d73947548a748e9534e8e4fddd2638109ff3fb081df"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2bd9d334412961125e9f68d5b73c1d0ab9ea3f74a58a475e6b119f5293eee7ba"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:35d78076736f4a668d57ade00c65d30a8ce28719d8a42471b2a06ccd1a2e3063"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:824dff4f9f4d0f59d0fa3577932ee9a20e09edec8a2f813e1d6b9f89ced8293f"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:52b8b4e06fc15519019e128abedaeb56412b106ab88b3c452188ca47a25c4093"},
{file = "aiohttp-3.9.4-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eae569fb1e7559d4f3919965617bb39f9e753967fae55ce13454bec2d1c54f09"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:69b97aa5792428f321f72aeb2f118e56893371f27e0b7d05750bcad06fc42ca1"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:4d79aad0ad4b980663316f26d9a492e8fab2af77c69c0f33780a56843ad2f89e"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:d6577140cd7db19e430661e4b2653680194ea8c22c994bc65b7a19d8ec834403"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_s390x.whl", hash = "sha256:9860d455847cd98eb67897f5957b7cd69fbcb436dd3f06099230f16a66e66f79"},
{file = "aiohttp-3.9.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:69ff36d3f8f5652994e08bd22f093e11cfd0444cea310f92e01b45a4e46b624e"},
{file = "aiohttp-3.9.4-cp311-cp311-win32.whl", hash = "sha256:e27d3b5ed2c2013bce66ad67ee57cbf614288bda8cdf426c8d8fe548316f1b5f"},
{file = "aiohttp-3.9.4-cp311-cp311-win_amd64.whl", hash = "sha256:d6a67e26daa686a6fbdb600a9af8619c80a332556245fa8e86c747d226ab1a1e"},
{file = "aiohttp-3.9.4-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:c5ff8ff44825736a4065d8544b43b43ee4c6dd1530f3a08e6c0578a813b0aa35"},
{file = "aiohttp-3.9.4-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d12a244627eba4e9dc52cbf924edef905ddd6cafc6513849b4876076a6f38b0e"},
{file = "aiohttp-3.9.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:dcad56c8d8348e7e468899d2fb3b309b9bc59d94e6db08710555f7436156097f"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4f7e69a7fd4b5ce419238388e55abd220336bd32212c673ceabc57ccf3d05b55"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c4870cb049f10d7680c239b55428916d84158798eb8f353e74fa2c98980dcc0b"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3b2feaf1b7031ede1bc0880cec4b0776fd347259a723d625357bb4b82f62687b"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:939393e8c3f0a5bcd33ef7ace67680c318dc2ae406f15e381c0054dd658397de"},
{file = "aiohttp-3.9.4-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7d2334e387b2adcc944680bebcf412743f2caf4eeebd550f67249c1c3696be04"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:e0198ea897680e480845ec0ffc5a14e8b694e25b3f104f63676d55bf76a82f1a"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:e40d2cd22914d67c84824045861a5bb0fb46586b15dfe4f046c7495bf08306b2"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:aba80e77c227f4234aa34a5ff2b6ff30c5d6a827a91d22ff6b999de9175d71bd"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_s390x.whl", hash = "sha256:fb68dc73bc8ac322d2e392a59a9e396c4f35cb6fdbdd749e139d1d6c985f2527"},
{file = "aiohttp-3.9.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:f3460a92638dce7e47062cf088d6e7663adb135e936cb117be88d5e6c48c9d53"},
{file = "aiohttp-3.9.4-cp312-cp312-win32.whl", hash = "sha256:32dc814ddbb254f6170bca198fe307920f6c1308a5492f049f7f63554b88ef36"},
{file = "aiohttp-3.9.4-cp312-cp312-win_amd64.whl", hash = "sha256:63f41a909d182d2b78fe3abef557fcc14da50c7852f70ae3be60e83ff64edba5"},
{file = "aiohttp-3.9.4-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:c3770365675f6be220032f6609a8fbad994d6dcf3ef7dbcf295c7ee70884c9af"},
{file = "aiohttp-3.9.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:305edae1dea368ce09bcb858cf5a63a064f3bff4767dec6fa60a0cc0e805a1d3"},
{file = "aiohttp-3.9.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6f121900131d116e4a93b55ab0d12ad72573f967b100e49086e496a9b24523ea"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b71e614c1ae35c3d62a293b19eface83d5e4d194e3eb2fabb10059d33e6e8cbf"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:419f009fa4cfde4d16a7fc070d64f36d70a8d35a90d71aa27670bba2be4fd039"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7b39476ee69cfe64061fd77a73bf692c40021f8547cda617a3466530ef63f947"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b33f34c9c7decdb2ab99c74be6443942b730b56d9c5ee48fb7df2c86492f293c"},
{file = "aiohttp-3.9.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c78700130ce2dcebb1a8103202ae795be2fa8c9351d0dd22338fe3dac74847d9"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:268ba22d917655d1259af2d5659072b7dc11b4e1dc2cb9662fdd867d75afc6a4"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:17e7c051f53a0d2ebf33013a9cbf020bb4e098c4bc5bce6f7b0c962108d97eab"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:7be99f4abb008cb38e144f85f515598f4c2c8932bf11b65add0ff59c9c876d99"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_s390x.whl", hash = "sha256:d58a54d6ff08d2547656356eea8572b224e6f9bbc0cf55fa9966bcaac4ddfb10"},
{file = "aiohttp-3.9.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:7673a76772bda15d0d10d1aa881b7911d0580c980dbd16e59d7ba1422b2d83cd"},
{file = "aiohttp-3.9.4-cp38-cp38-win32.whl", hash = "sha256:e4370dda04dc8951012f30e1ce7956a0a226ac0714a7b6c389fb2f43f22a250e"},
{file = "aiohttp-3.9.4-cp38-cp38-win_amd64.whl", hash = "sha256:eb30c4510a691bb87081192a394fb661860e75ca3896c01c6d186febe7c88530"},
{file = "aiohttp-3.9.4-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:84e90494db7df3be5e056f91412f9fa9e611fbe8ce4aaef70647297f5943b276"},
{file = "aiohttp-3.9.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:7d4845f8501ab28ebfdbeab980a50a273b415cf69e96e4e674d43d86a464df9d"},
{file = "aiohttp-3.9.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:69046cd9a2a17245c4ce3c1f1a4ff8c70c7701ef222fce3d1d8435f09042bba1"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8b73a06bafc8dcc508420db43b4dd5850e41e69de99009d0351c4f3007960019"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:418bb0038dfafeac923823c2e63226179976c76f981a2aaad0ad5d51f2229bca"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:71a8f241456b6c2668374d5d28398f8e8cdae4cce568aaea54e0f39359cd928d"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:935c369bf8acc2dc26f6eeb5222768aa7c62917c3554f7215f2ead7386b33748"},
{file = "aiohttp-3.9.4-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:74e4e48c8752d14ecfb36d2ebb3d76d614320570e14de0a3aa7a726ff150a03c"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:916b0417aeddf2c8c61291238ce25286f391a6acb6f28005dd9ce282bd6311b6"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:9b6787b6d0b3518b2ee4cbeadd24a507756ee703adbac1ab6dc7c4434b8c572a"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:221204dbda5ef350e8db6287937621cf75e85778b296c9c52260b522231940ed"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_s390x.whl", hash = "sha256:10afd99b8251022ddf81eaed1d90f5a988e349ee7d779eb429fb07b670751e8c"},
{file = "aiohttp-3.9.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2506d9f7a9b91033201be9ffe7d89c6a54150b0578803cce5cb84a943d075bc3"},
{file = "aiohttp-3.9.4-cp39-cp39-win32.whl", hash = "sha256:e571fdd9efd65e86c6af2f332e0e95dad259bfe6beb5d15b3c3eca3a6eb5d87b"},
{file = "aiohttp-3.9.4-cp39-cp39-win_amd64.whl", hash = "sha256:7d29dd5319d20aa3b7749719ac9685fbd926f71ac8c77b2477272725f882072d"},
{file = "aiohttp-3.9.4.tar.gz", hash = "sha256:6ff71ede6d9a5a58cfb7b6fffc83ab5d4a63138276c771ac91ceaaddf5459644"},
]
[package.dependencies]
@@ -2900,4 +2900,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "df7161da4fdc3cba0a445176fc9dda2a0e8a53e13a7aa8a864385ca259381b41"
content-hash = "b3452b50901123fd5f2c385ce8a0c1c492296393b8a7926a322b6df0ea3ac572"

View File

@@ -331,7 +331,6 @@ async fn main() -> anyhow::Result<()> {
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
let cancel_map = CancelMap::default();
let redis_publisher = match &regional_redis_client {
@@ -357,7 +356,6 @@ async fn main() -> anyhow::Result<()> {
config,
proxy_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
@@ -372,7 +370,6 @@ async fn main() -> anyhow::Result<()> {
config,
serverless_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
}
@@ -533,7 +530,11 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let api = console::provider::neon::Api::new(endpoint, caches, locks);
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(endpoint_rps_limit));
let api =
console::provider::neon::Api::new(endpoint, caches, locks, endpoint_rate_limiter);
let api = console::provider::ConsoleBackend::Console(api);
auth::BackendType::Console(MaybeOwned::Owned(api), ())
}
@@ -567,8 +568,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let mut redis_rps_limit = args.redis_rps_limit.clone();
RateBucketInfo::validate(&mut redis_rps_limit)?;
@@ -581,7 +580,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
authentication_config,
require_client_ip: args.require_client_ip,
disable_ip_check_for_http: args.disable_ip_check_for_http,
endpoint_rps_limit,
redis_rps_limit,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),

View File

@@ -70,20 +70,14 @@ impl EndpointsCache {
if !self.ready.load(Ordering::Acquire) {
return true;
}
// If cache is disabled, just collect the metrics and return.
if self.config.disable_cache {
let rejected = self.should_reject(endpoint);
ctx.set_rejected(rejected);
info!(?rejected, "check endpoint is valid, disabled cache");
return true;
}
// If the limiter allows, we don't need to check the cache.
if self.limiter.lock().await.check() {
return true;
}
let rejected = self.should_reject(endpoint);
info!(?rejected, "check endpoint is valid, enabled cache");
ctx.set_rejected(rejected);
info!(?rejected, "check endpoint is valid, disabled cache");
// If cache is disabled, just collect the metrics and return or
// If the limiter allows, we don't need to check the cache.
if self.config.disable_cache || self.limiter.lock().await.check() {
return true;
}
!rejected
}
fn should_reject(&self, endpoint: &EndpointId) -> bool {

View File

@@ -29,7 +29,6 @@ pub struct ProxyConfig {
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,
pub disable_ip_check_for_http: bool,
pub endpoint_rps_limit: Vec<RateBucketInfo>,
pub redis_rps_limit: Vec<RateBucketInfo>,
pub region: String,
pub handshake_timeout: Duration,

View File

@@ -208,6 +208,9 @@ pub mod errors {
#[error(transparent)]
ApiError(ApiError),
#[error("Too many connections attempts")]
TooManyConnections,
#[error("Timeout waiting to acquire wake compute lock")]
TimeoutError,
}
@@ -240,6 +243,8 @@ pub mod errors {
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
TooManyConnections => self.to_string(),
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
}
}
@@ -250,6 +255,7 @@ pub mod errors {
match self {
WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
WakeComputeError::ApiError(e) => e.get_error_kind(),
WakeComputeError::TooManyConnections => crate::error::ErrorKind::RateLimit,
WakeComputeError::TimeoutError => crate::error::ErrorKind::ServiceRateLimit,
}
}

View File

@@ -12,6 +12,7 @@ use crate::{
console::messages::ColdStartInfo,
http,
metrics::{CacheOutcome, Metrics},
rate_limiter::EndpointRateLimiter,
scram, Normalize,
};
use crate::{cache::Cached, context::RequestMonitoring};
@@ -25,6 +26,7 @@ pub struct Api {
endpoint: http::Endpoint,
pub caches: &'static ApiCaches,
pub locks: &'static ApiLocks,
pub endpoint_rate_limiter: Arc<EndpointRateLimiter>,
jwt: String,
}
@@ -34,6 +36,7 @@ impl Api {
endpoint: http::Endpoint,
caches: &'static ApiCaches,
locks: &'static ApiLocks,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> Self {
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
Ok(v) => v,
@@ -43,6 +46,7 @@ impl Api {
endpoint,
caches,
locks,
endpoint_rate_limiter,
jwt,
}
}
@@ -277,6 +281,14 @@ impl super::Api for Api {
return Ok(cached);
}
// check rate limit
if !self
.endpoint_rate_limiter
.check(user_info.endpoint.normalize().into(), 1)
{
return Err(WakeComputeError::TooManyConnections);
}
let permit = self.locks.get_wake_compute_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled

View File

@@ -51,7 +51,7 @@ pub struct RequestMonitoring {
sender: Option<mpsc::UnboundedSender<RequestData>>,
pub latency_timer: LatencyTimer,
// Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
rejected: bool,
rejected: Option<bool>,
}
#[derive(Clone, Debug)]
@@ -96,7 +96,7 @@ impl RequestMonitoring {
error_kind: None,
auth_method: None,
success: false,
rejected: false,
rejected: None,
cold_start_info: ColdStartInfo::Unknown,
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
@@ -118,7 +118,7 @@ impl RequestMonitoring {
}
pub fn set_rejected(&mut self, rejected: bool) {
self.rejected = rejected;
self.rejected = Some(rejected);
}
pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
@@ -200,27 +200,28 @@ impl Drop for RequestMonitoring {
} else {
ConnectOutcome::Failed
};
let rejected = self.rejected;
let ep = self
.endpoint_id
.as_ref()
.map(|x| x.as_str())
.unwrap_or_default();
// This makes sense only if cache is disabled
info!(
?ep,
?outcome,
?rejected,
"check endpoint is valid with outcome"
);
Metrics::get()
.proxy
.invalid_endpoints_total
.inc(InvalidEndpointsGroup {
protocol: self.protocol,
rejected: rejected.into(),
outcome,
});
if let Some(rejected) = self.rejected {
let ep = self
.endpoint_id
.as_ref()
.map(|x| x.as_str())
.unwrap_or_default();
// This makes sense only if cache is disabled
info!(
?outcome,
?rejected,
?ep,
"check endpoint is valid with outcome"
);
Metrics::get()
.proxy
.invalid_endpoints_total
.inc(InvalidEndpointsGroup {
protocol: self.protocol,
rejected: rejected.into(),
outcome,
});
}
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}

View File

@@ -19,9 +19,8 @@ use crate::{
metrics::{Metrics, NumClientConnectionsGuard},
protocol2::WithClientIp,
proxy::handshake::{handshake, HandshakeData},
rate_limiter::EndpointRateLimiter,
stream::{PqStream, Stream},
EndpointCacheKey, Normalize,
EndpointCacheKey,
};
use futures::TryFutureExt;
use itertools::Itertools;
@@ -61,7 +60,6 @@ pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
scopeguard::defer! {
@@ -86,7 +84,6 @@ pub async fn task_main(
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
@@ -128,7 +125,6 @@ pub async fn task_main(
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter,
conn_gauge,
)
.instrument(span.clone())
@@ -242,7 +238,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
cancellation_handler: Arc<CancellationHandlerMain>,
stream: S,
mode: ClientMode,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conn_gauge: NumClientConnectionsGuard<'static>,
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
info!(
@@ -288,15 +283,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
Err(e) => stream.throw_error(e).await?,
};
// check rate limit
if let Some(ep) = user_info.get_endpoint() {
if !endpoint_rate_limiter.check(ep.normalize(), 1) {
return stream
.throw_error(auth::AuthError::too_many_connections())
.await?;
}
}
let user = user_info.get_user().to_owned();
let user_info = match user_info
.authenticate(

View File

@@ -90,6 +90,7 @@ fn report_error(e: &WakeComputeError, retry: bool) {
WakeComputeError::ApiError(ApiError::Console { .. }) => {
WakeupFailureKind::ApiConsoleOtherError
}
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TimeoutError => WakeupFailureKind::TimeoutError,
};
Metrics::get()

View File

@@ -15,7 +15,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
use tokio::time::{Duration, Instant};
use tracing::info;
use crate::EndpointId;
use crate::intern::EndpointIdInt;
pub struct GlobalRateLimiter {
data: Vec<RateBucket>,
@@ -61,12 +61,7 @@ impl GlobalRateLimiter {
// Purposefully ignore user name and database name as clients can reconnect
// with different names, so we'll end up sending some http requests to
// the control plane.
//
// We also may save quite a lot of CPU (I think) by bailing out right after we
// saw SNI, before doing TLS handshake. User-side error messages in that case
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
// I went with a more expensive way that yields user-friendlier error messages.
pub type EndpointRateLimiter = BucketRateLimiter<EndpointId, StdRng, RandomState>;
pub type EndpointRateLimiter = BucketRateLimiter<EndpointIdInt, StdRng, RandomState>;
pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
map: DashMap<Key, Vec<RateBucket>, Hasher>,
@@ -245,7 +240,7 @@ mod tests {
use tokio::time;
use super::{BucketRateLimiter, EndpointRateLimiter};
use crate::{rate_limiter::RateBucketInfo, EndpointId};
use crate::{intern::EndpointIdInt, rate_limiter::RateBucketInfo, EndpointId};
#[test]
fn rate_bucket_rpi() {
@@ -295,39 +290,40 @@ mod tests {
let limiter = EndpointRateLimiter::new(rates);
let endpoint = EndpointId::from("ep-my-endpoint-1234");
let endpoint = EndpointIdInt::from(endpoint);
time::pause();
for _ in 0..100 {
assert!(limiter.check(endpoint.clone(), 1));
assert!(limiter.check(endpoint, 1));
}
// more connections fail
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// fail even after 500ms as it's in the same bucket
time::advance(time::Duration::from_millis(500)).await;
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// after a full 1s, 100 requests are allowed again
time::advance(time::Duration::from_millis(500)).await;
for _ in 1..6 {
for _ in 0..50 {
assert!(limiter.check(endpoint.clone(), 2));
assert!(limiter.check(endpoint, 2));
}
time::advance(time::Duration::from_millis(1000)).await;
}
// more connections after 600 will exceed the 20rps@30s limit
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// will still fail before the 30 second limit
time::advance(time::Duration::from_millis(30_000 - 6_000 - 1)).await;
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// after the full 30 seconds, 100 requests are allowed again
time::advance(time::Duration::from_millis(1)).await;
for _ in 0..100 {
assert!(limiter.check(endpoint.clone(), 1));
assert!(limiter.check(endpoint, 1));
}
}

View File

@@ -35,7 +35,6 @@ use crate::context::RequestMonitoring;
use crate::metrics::Metrics;
use crate::protocol2::WithClientIp;
use crate::proxy::run_until_cancelled;
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
@@ -53,7 +52,6 @@ pub async fn task_main(
config: &'static ProxyConfig,
ws_listener: TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
scopeguard::defer! {
@@ -117,7 +115,6 @@ pub async fn task_main(
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
@@ -147,7 +144,6 @@ async fn connection_handler(
backend: Arc<PoolingBackend>,
connections: TaskTracker,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_token: CancellationToken,
server: Builder<TokioExecutor>,
tls_acceptor: TlsAcceptor,
@@ -231,7 +227,6 @@ async fn connection_handler(
cancellation_handler.clone(),
session_id,
peer_addr,
endpoint_rate_limiter.clone(),
http_request_token,
)
.in_current_span()
@@ -270,7 +265,6 @@ async fn request_handler(
cancellation_handler: Arc<CancellationHandlerMain>,
session_id: uuid::Uuid,
peer_addr: IpAddr,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
// used to cancel in-flight HTTP requests. not used to cancel websockets
http_cancellation_token: CancellationToken,
) -> Result<Response<Full<Bytes>>, ApiError> {
@@ -298,15 +292,9 @@ async fn request_handler(
ws_connections.spawn(
async move {
if let Err(e) = websocket::serve_websocket(
config,
ctx,
websocket,
cancellation_handler,
host,
endpoint_rate_limiter,
)
.await
if let Err(e) =
websocket::serve_websocket(config, ctx, websocket, cancellation_handler, host)
.await
{
error!("error in websocket connection: {e:#}");
}

View File

@@ -5,7 +5,6 @@ use crate::{
error::{io_error, ReportableError},
metrics::Metrics,
proxy::{handle_client, ClientMode},
rate_limiter::EndpointRateLimiter,
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream};
@@ -136,7 +135,6 @@ pub async fn serve_websocket(
websocket: HyperWebsocket,
cancellation_handler: Arc<CancellationHandlerMain>,
hostname: Option<String>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
let websocket = websocket.await?;
let conn_gauge = Metrics::get()
@@ -150,7 +148,6 @@ pub async fn serve_websocket(
cancellation_handler,
WebSocketRw::new(websocket),
ClientMode::Websockets { hostname },
endpoint_rate_limiter,
conn_gauge,
)
.await;

View File

@@ -33,7 +33,7 @@ psutil = "^5.9.4"
types-psutil = "^5.9.5.12"
types-toml = "^0.10.8.6"
pytest-httpserver = "^1.0.8"
aiohttp = "3.9.2"
aiohttp = "3.9.4"
pytest-rerunfailures = "^13.0"
types-pytest-lazy-fixture = "^0.6.3.3"
pytest-split = "^0.8.1"

View File

@@ -25,6 +25,7 @@ async-stream.workspace = true
tokio-stream.workspace = true
futures-util.workspace = true
itertools.workspace = true
camino.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }

View File

@@ -5,6 +5,7 @@ pub mod cloud_admin_api;
pub mod garbage;
pub mod metadata_stream;
pub mod scan_metadata;
pub mod tenant_snapshot;
use std::env;
use std::fmt::Display;
@@ -23,12 +24,12 @@ use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use pageserver::tenant::TENANTS_SEGMENT_NAME;
use pageserver_api::shard::TenantShardId;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
use tokio::io::AsyncReadExt;
use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
@@ -240,7 +241,6 @@ pub fn init_logging(file_name: &str) -> WorkerGuard {
.with_ansi(false)
.with_writer(file_writer);
let stderr_logs = fmt::Layer::new()
.with_ansi(std::io::stderr().is_terminal())
.with_target(false)
.with_writer(std::io::stderr);
tracing_subscriber::registry()
@@ -396,3 +396,48 @@ async fn download_object_with_retries(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}
async fn download_object_to_file(
s3_client: &Client,
bucket_name: &str,
key: &str,
version_id: Option<&str>,
local_path: &Utf8Path,
) -> anyhow::Result<()> {
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
for _ in 0..MAX_RETRIES {
tokio::fs::remove_file(&tmp_path).await.ok();
let mut file = tokio::fs::File::create(&tmp_path)
.await
.context("Opening output file")?;
let request = s3_client.get_object().bucket(bucket_name).key(key);
let request = match version_id {
Some(version_id) => request.version_id(version_id),
None => request,
};
let response_stream = match request.send().await {
Ok(response) => response,
Err(e) => {
error!(
"Failed to download object for key {key} version {}: {e:#}",
version_id.unwrap_or("")
);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let mut read_stream = response_stream.body.into_async_read();
tokio::io::copy(&mut read_stream, &mut file).await?;
tokio::fs::rename(&tmp_path, local_path).await?;
return Ok(());
}
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}

View File

@@ -1,9 +1,12 @@
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_metadata::scan_metadata;
use s3_scrubber::tenant_snapshot::SnapshotDownloader;
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
use clap::{Parser, Subcommand};
use utils::id::TenantId;
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
@@ -38,6 +41,12 @@ enum Command {
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
},
TenantSnapshot {
#[arg(long = "tenant-id", num_args = 0..)]
tenant_id: TenantId,
#[arg(short, long)]
output_path: Utf8PathBuf,
},
}
#[tokio::main]
@@ -50,6 +59,7 @@ async fn main() -> anyhow::Result<()> {
Command::ScanMetadata { .. } => "scan",
Command::FindGarbage { .. } => "find-garbage",
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
@@ -102,5 +112,12 @@ async fn main() -> anyhow::Result<()> {
Command::PurgeGarbage { input_path, mode } => {
purge_garbage(input_path, mode, !cli.delete).await
}
Command::TenantSnapshot {
tenant_id,
output_path,
} => {
let downloader = SnapshotDownloader::new(bucket_config, tenant_id, output_path)?;
downloader.download().await
}
}
}

View File

@@ -5,7 +5,7 @@ use tokio_stream::Stream;
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
use pageserver_api::shard::TenantShardId;
use utils::id::TimelineId;
use utils::id::{TenantId, TimelineId};
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
@@ -45,6 +45,62 @@ pub fn stream_tenants<'a>(
}
}
pub async fn stream_tenant_shards<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
tenant_id: TenantId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let shards_target = target.tenant_root(&TenantShardId::unsharded(tenant_id));
loop {
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
let fetch_response =
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
let fetch_response = match fetch_response {
Err(e) => {
tenant_shard_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
});
for i in new_entry_ids {
tenant_shard_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(stream! {
for i in tenant_shard_ids {
let id = i?;
yield Ok(id);
}
})
}
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.

View File

@@ -0,0 +1,222 @@
use std::sync::Arc;
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, S3Target,
TenantShardTimelineId,
};
use anyhow::Context;
use async_stream::stream;
use aws_sdk_s3::Client;
use camino::Utf8PathBuf;
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
use utils::generation::Generation;
use utils::id::TenantId;
pub struct SnapshotDownloader {
s3_client: Arc<Client>,
s3_root: RootTarget,
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
}
impl SnapshotDownloader {
pub fn new(
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
) -> anyhow::Result<Self> {
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
Ok(Self {
s3_client,
s3_root,
bucket_config,
tenant_id,
output_path,
})
}
async fn download_layer(
&self,
local_path: Utf8PathBuf,
remote_timeline_path: S3Target,
layer_name: LayerFileName,
layer_metadata: IndexLayerMetadata,
) -> anyhow::Result<(LayerFileName, IndexLayerMetadata)> {
// Assumption: we always write layer files atomically, and layer files are immutable. Therefore if the file
// already exists on local disk, we assume it is fully correct and skip it.
if tokio::fs::try_exists(&local_path).await? {
tracing::debug!("{} already exists", local_path);
return Ok((layer_name, layer_metadata));
} else {
tracing::debug!("{} requires download...", local_path);
let remote_layer_path = format!(
"{}{}{}",
remote_timeline_path.prefix_in_bucket,
layer_name.file_name(),
layer_metadata.generation.get_suffix()
);
// List versions: the object might be deleted.
let versions = self
.s3_client
.list_object_versions()
.bucket(self.bucket_config.bucket.clone())
.prefix(&remote_layer_path)
.send()
.await?;
let Some(versions) = versions.versions else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
};
let Some(version) = versions.first() else {
return Err(anyhow::anyhow!(
"Empty versions found for {remote_layer_path}"
));
};
download_object_to_file(
&self.s3_client,
&self.bucket_config.bucket,
&remote_layer_path,
version.version_id.as_deref(),
&local_path,
)
.await?;
tracing::debug!("Downloaded successfully to {local_path}");
}
Ok((layer_name, layer_metadata))
}
async fn download_timeline(
&self,
ttid: TenantShardTimelineId,
index_part: IndexPart,
index_part_generation: Generation,
) -> anyhow::Result<()> {
let timeline_root = self.s3_root.timeline_root(&ttid);
let layer_count = index_part.layer_metadata.len();
tracing::info!(
"Downloading {} layers for timeline {ttid}...",
index_part.layer_metadata.len()
);
tokio::fs::create_dir_all(self.output_path.join(format!(
"{}/timelines/{}",
ttid.tenant_shard_id, ttid.timeline_id
)))
.await?;
let index_bytes = serde_json::to_string(&index_part).unwrap();
let layers_stream = stream! {
for (layer_name, layer_metadata) in index_part.layer_metadata {
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
// different layer names (remote-style has the generation suffix)
let local_path = self.output_path.join(format!(
"{}/timelines/{}/{}{}",
ttid.tenant_shard_id,
ttid.timeline_id,
layer_name.file_name(),
layer_metadata.generation.get_suffix()
));
yield self.download_layer(local_path, timeline_root.clone(), layer_name, layer_metadata);
}
};
let layer_results = layers_stream.buffered(8);
let mut layer_results = std::pin::pin!(layer_results);
let mut err = None;
let mut download_count = 0;
while let Some(i) = layer_results.next().await {
download_count += 1;
match i {
Ok((layer_name, layer_metadata)) => {
tracing::info!(
"[{download_count}/{layer_count}] OK: {} bytes {ttid} {}",
layer_metadata.file_size,
layer_name.file_name()
);
}
Err(e) => {
// Warn and continue: we will download what we can
tracing::warn!("Download error: {e}");
err = Some(e);
}
}
}
// Write index last, once all the layers it references are downloaded
let local_index_path = self.output_path.join(format!(
"{}/timelines/{}/index_part.json{}",
ttid.tenant_shard_id,
ttid.timeline_id,
index_part_generation.get_suffix()
));
tokio::fs::write(&local_index_path, index_bytes)
.await
.context("writing index")?;
if let Some(e) = err {
tracing::warn!("Some errors occurred, last error: {e}");
Err(e)
} else {
Ok(())
}
}
pub async fn download(&self) -> anyhow::Result<()> {
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
let mut shards = std::pin::pin!(shards);
while let Some(shard) = shards.next().await {
let shard = shard?;
// Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
s3_client: &Client,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
match data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
} => {
self.download_timeline(ttid, index_part, index_part_generation)
.await
.context("Downloading timeline")?;
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(_) => {
tracing::error!("Bad metadata in timeline {ttid}");
}
};
}
}
Ok(())
}
}

View File

@@ -1,6 +1,7 @@
import json
import os
import random
import time
from pathlib import Path
from typing import Any, Dict, Optional
@@ -582,6 +583,91 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
)
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
"""
Slow test that runs in realtime, checks that the background scheduling of secondary
downloads happens as expected.
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
# Create this many tenants, each with two timelines
tenant_count = 4
tenant_timelines = {}
# This mirrors a constant in `downloader.rs`
freshen_interval_secs = 60
for _i in range(0, tenant_count):
tenant_id = TenantId.generate()
timeline_a = TimelineId.generate()
timeline_b = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id,
timeline_a,
placement_policy='{"Attached":1}',
# Run with a low heatmap period so that we can avoid having to do synthetic API calls
# to trigger the upload promptly.
conf={"heatmap_period": "1s"},
)
env.neon_cli.create_timeline("main2", tenant_id, timeline_b)
tenant_timelines[tenant_id] = [timeline_a, timeline_b]
t_start = time.time()
# Wait long enough that the background downloads should happen; we expect all the inital layers
# of all the initial timelines to show up on the secondary location of each tenant.
time.sleep(freshen_interval_secs * 1.5)
for tenant_id, timelines in tenant_timelines.items():
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
# We only have two: the other one must be secondary
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
for timeline_id in timelines:
log.info(f"Checking for secondary timeline {timeline_id} on node {ps_secondary.id}")
# One or more layers should be present for all timelines
assert list_layers(ps_secondary, tenant_id, timeline_id)
# Delete the second timeline: this should be reflected later on the secondary
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timelines[1])
# Wait long enough for the secondary locations to see the deletion
time.sleep(freshen_interval_secs * 1.5)
for tenant_id, timelines in tenant_timelines.items():
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
# We only have two: the other one must be secondary
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
# This one was not deleted
assert list_layers(ps_secondary, tenant_id, timelines[0])
# This one was deleted
assert not list_layers(ps_secondary, tenant_id, timelines[1])
t_end = time.time()
# Measure how many heatmap downloads we did in total: this checks that we succeeded with
# proper scheduling, and not some bug that just runs downloads in a loop.
total_heatmap_downloads = 0
for ps in env.pageservers:
v = ps.http_client().get_metric_value("pageserver_secondary_download_heatmap_total")
assert v is not None
total_heatmap_downloads += int(v)
download_rate = (total_heatmap_downloads / tenant_count) / (t_end - t_start)
expect_download_rate = 1.0 / freshen_interval_secs
log.info(f"Download rate: {download_rate * 60}/min vs expected {expect_download_rate * 60}/min")
assert download_rate < expect_download_rate * 2
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
@pytest.mark.parametrize("via_controller", [True, False])
def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controller: bool):

View File

@@ -274,7 +274,8 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
but imports the generation number.
"""
neon_env_builder.num_pageservers = 2
# One pageserver to simulate legacy environment, two to be managed by storage controller
neon_env_builder.num_pageservers = 3
# Start services by hand so that we can skip registration on one of the pageservers
env = neon_env_builder.init_configs()
@@ -289,10 +290,10 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
)
origin_ps = env.pageservers[0]
# This is the pageserver managed by the sharding service, where the tenant
# These are the pageservers managed by the sharding service, where the tenant
# will be attached after onboarding
env.pageservers[1].start()
dest_ps = env.pageservers[1]
env.pageservers[2].start()
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
for sk in env.safekeepers:
@@ -331,6 +332,9 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
)
virtual_ps_http.tenant_secondary_download(tenant_id)
warm_up_ps = env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"node_secondary"
][0]
# Call into storage controller to onboard the tenant
generation += 1
@@ -345,6 +349,18 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
)
assert len(r["shards"]) == 1
describe = env.storage_controller.tenant_describe(tenant_id)["shards"][0]
dest_ps_id = describe["node_attached"]
dest_ps = env.get_pageserver(dest_ps_id)
if warm_up:
# The storage controller should have attached the tenant to the same placce
# it had a secondary location, otherwise there was no point warming it up
assert dest_ps_id == warm_up_ps
# It should have been given a new secondary location as well
assert len(describe["node_secondary"]) == 1
assert describe["node_secondary"][0] != warm_up_ps
# As if doing a live migration, detach the original pageserver
origin_ps.http_client().tenant_location_conf(
tenant_id,
@@ -416,6 +432,9 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
dest_tenant_after_conf_change["generation"] == dest_tenant_before_conf_change["generation"]
)
dest_tenant_conf_after = dest_ps.http_client().tenant_config(tenant_id)
# Storage controller auto-sets heatmap period, ignore it for the comparison
del dest_tenant_conf_after.tenant_specific_overrides["heatmap_period"]
assert dest_tenant_conf_after.tenant_specific_overrides == modified_tenant_conf
env.storage_controller.consistency_check()