Compare commits

..

8 Commits

Author SHA1 Message Date
Stefan Radig
81ca0d0881 Add skeleton for generalized IP allowlists 2024-04-18 15:29:05 +02:00
John Spray
637ad4a638 pageserver: fix secondary download scheduling (#7396)
## Problem

Some tenants were observed to stop doing downloads after some time

## Summary of changes

- Fix a rogue `<` that was incorrectly scheduling work when `now` was
_before_ the scheduling target, rather than after. This usually resulted
in too-frequent execution, but could also result in never executing, if
the current time has advanced ahead of `next_download` at the time we
call `schedule()`.
- Fix in-memory list of timelines not being amended after timeline
deletion: the resulted in repeated harmless logs about the timeline
being removed, and redundant calls to remove_dir_all for the timeline
path.
- Add a log at startup to make it easier to see a particular tenant
starting in secondary mode (this is for parity with the logging that
exists when spawning an attached tenant). Previously searching on tenant
ID didn't provide a clear signal as to how the tenant was started during
pageserver start.
- Add a test that exercises secondary downloads using the background
scheduling, whereas existing tests were using the API hook to invoke
download directly.
2024-04-18 13:16:03 +01:00
Joonas Koivunen
8d0f701767 feat: copy delta layer prefix or "truncate" (#7228)
For "timeline ancestor merge" or "timeline detach," we need to "cut"
delta layers at particular LSN. The name "truncate" is not used as it
would imply that a layer file changes, instead of what happens: we copy
keys with Lsn less than a "cut point".

Cc: #6994 

Add the "copy delta layer prefix" operation to DeltaLayerInner, re-using
some of the vectored read internals. The code is `cfg(test)` until it
will be used later with a more complete integration test.
2024-04-18 10:43:04 +03:00
Anna Khanova
5191f6ef0e proxy: Record only valid rejected events (#7415)
## Problem

Sometimes rejected metric might record invalid events.

## Summary of changes

* Only record it `rejected` was explicitly set.
* Change order in logs.
* Report metrics if not under high-load.
2024-04-18 06:09:12 +01:00
Conrad Ludgate
a54ea8fb1c proxy: move endpoint rate limiter (#7413)
## Problem

## Summary of changes

Rate limit for wake_compute calls
2024-04-18 06:00:33 +01:00
Anna Khanova
d5708e7435 proxy: Record role to span (#7407)
## Problem

## Summary of changes

Add dbrole to span.
2024-04-17 14:16:11 +02:00
Anna Khanova
fd49005cb3 proxy: Improve logging (#7405)
## Problem

It's unclear from logs what's going on with the regional redis.

## Summary of changes

Make logs better.
2024-04-17 11:33:31 +00:00
Vlad Lazar
3023de156e pageserver: demote range end fallback log (#7403)
## Problem
This trace is emitted whenever a vectored read touches the end of a
delta layer file. It's a perfectly normal case, but I expected it to be
more rare when implementing the code.

## Summary of changes
Demote log to debug.
2024-04-17 11:32:07 +01:00
21 changed files with 877 additions and 122 deletions

View File

@@ -0,0 +1,62 @@
# Name
Created on 2024.04.16
Implemented on -
## Summary
Neon allows tenants with higher security needs to allowlist IP addresses for access to their database. However, in some cases it is inconvenient because the access comes from ephemeral IP addresses (e.g., from a user's browser). Thus we are searching for a way which maintains security while also making the usage less difficult when IP allowlists are used.
## Motivation
When IP allowlisting is enabled only connections from the listed IP addresses are allowed. Every other incoming connection is rejected. This is increasing security when the connection is originating from another service run by the tenants which always uses a limited range of IP addresses. However there are a couple of use cases where this is not true:
- **Console SQL editor**: The connection is orginating from a user's browser.
- **Schema-diff**: The connection is orginating from a user's browser (*link issue*).
- **Time-travel**:
- **Proxy-link**: (*link issue*)
In those cases a user currently needs to add the originating IP address to the IP allowlist. This requires manually obtaining the own IP address and adding it to the IP allowlist (*I believe there is a button for that right now, correct?*). Also once an IP address is allowlisted it will in many cases stay in the list because people may forget to delete it after they do not use it anymore. There is also the danger that they accidentally delete the wrong IP address when they are done.
Also we should think about RLS Auth which probably will might use the same approach.
## Non Goals (if relevant)
## Impacted components (e.g. pageserver, safekeeper, console, etc)
Proxy, console, control plane (*anything else?)
## Proposed implementation
If possible we would like to create a common implementation for the mentioned cases. However they might have different security requirements: Obtaining a database schema is less security sensitive than reading or even writing data to and from the database. Below are the different alternatives we consider:
#### JWT Tokens
A [JWT token](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-token-structure) is used as a means to authenticate the user to the proxy. The token can hold additional fields (*how are they called?*). One such field would give an IP address and allow the proxy to admit the access even if the IP address is not allowlisted. For example in the case of the console SQL editor a JWT token would be created by the console and sent to the proxy for authentication.
#### Restricting allowed query types for not allowlisted IP addresses
We could solve the issue for schema-diff by allowing only the schema queries if the IP address is not allowlisted. This would solve the schema-diff problem but no other.
#### Ephemeral nodes have a setting to ignore IP allowlist
For at least time-travel we use ephemeral compute nodes. Those could be restricted to be read-only
### Reliability, failure modes and corner cases (if relevant)
### Interaction/Sequence diagram (if relevant)
### Scalability (if relevant)
### Security implications (if relevant)
- We are logging when an IP allowlist is added: we should also log when a token is created
- To avoid replay attacks a JWT token should be bound to the IP address
### Unresolved questions (if relevant)
- In SQL editor how is authentciation working right now?
## Alternative implementation (if relevant)
## Pros/cons of proposed approaches (if relevant)
## Definition of Done (if relevant)

View File

@@ -1518,7 +1518,8 @@ pub(crate) struct SecondaryModeMetrics {
pub(crate) download_heatmap: IntCounter,
pub(crate) download_layer: IntCounter,
}
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| SecondaryModeMetrics {
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
SecondaryModeMetrics {
upload_heatmap: register_int_counter!(
"pageserver_secondary_upload_heatmap",
"Number of heatmaps written to remote storage by attached tenants"
@@ -1536,7 +1537,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
.expect("failed to define a metric"),
download_heatmap: register_int_counter!(
"pageserver_secondary_download_heatmap",
"Number of downloads of heatmaps by secondary mode locations"
"Number of downloads of heatmaps by secondary mode locations, including when it hasn't changed"
)
.expect("failed to define a metric"),
download_layer: register_int_counter!(
@@ -1544,6 +1545,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
"Number of downloads of layers by secondary mode locations"
)
.expect("failed to define a metric"),
}
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]

View File

@@ -33,6 +33,52 @@ impl Value {
}
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub(crate) enum InvalidInput {
TooShortValue,
TooShortPostgresRecord,
}
/// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
/// use this type for querying if a slice looks some particular way.
#[cfg(test)]
pub(crate) struct ValueBytes;
#[cfg(test)]
impl ValueBytes {
pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
if value_discriminator != [0, 0, 0, 1] {
// not a Value::WalRecord(..)
return Ok(false);
}
let walrecord_discriminator = &raw[4..8];
if walrecord_discriminator != [0, 0, 0, 0] {
// only NeonWalRecord::Postgres can have will_init
return Ok(false);
}
if raw.len() < 17 {
return Err(InvalidInput::TooShortPostgresRecord);
}
Ok(raw[8] == 1)
}
}
#[cfg(test)]
mod test {
use super::*;
@@ -70,6 +116,8 @@ mod test {
];
roundtrip!(image, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
}
#[test]
@@ -93,6 +141,96 @@ mod test {
];
roundtrip!(rec, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
}
#[test]
fn bytes_inspection_too_short_image() {
let rec = Value::Image(Bytes::from_static(b""));
#[rustfmt::skip]
let expected = [
// top level discriminator of 4 bytes
0x00, 0x00, 0x00, 0x00,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
roundtrip!(rec, expected);
assert!(ValueBytes::will_init(&expected).unwrap());
assert_eq!(expected.len(), 12);
for len in 0..12 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortValue
);
}
}
#[test]
fn bytes_inspection_too_short_postgres_record() {
let rec = NeonWalRecord::Postgres {
will_init: false,
rec: Bytes::from_static(b""),
};
let rec = Value::WalRecord(rec);
#[rustfmt::skip]
let expected = [
// flattened discriminator of total 8 bytes
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
// will_init
0x00,
// 8 byte length
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
roundtrip!(rec, expected);
assert!(!ValueBytes::will_init(&expected).unwrap());
assert_eq!(expected.len(), 17);
for len in 12..17 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortPostgresRecord
)
}
for len in 0..12 {
assert_eq!(
ValueBytes::will_init(&expected[..len]).unwrap_err(),
InvalidInput::TooShortValue
)
}
}
#[test]
fn clear_visibility_map_flags_example() {
let rec = NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: Some(0x11),
old_heap_blkno: None,
flags: 0x03,
};
let rec = Value::WalRecord(rec);
#[rustfmt::skip]
let expected = [
// discriminators
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01,
// Some == 1 followed by 4 bytes
0x01, 0x00, 0x00, 0x00, 0x11,
// None == 0
0x00,
// flags
0x03
];
roundtrip!(rec, expected);
assert!(!ValueBytes::will_init(&expected).unwrap());
}
}

View File

@@ -678,12 +678,19 @@ pub async fn init_tenant_mgr(
}
}
}
LocationMode::Secondary(secondary_conf) => TenantSlot::Secondary(SecondaryTenant::new(
tenant_shard_id,
shard_identity,
location_conf.tenant_conf,
&secondary_conf,
)),
LocationMode::Secondary(secondary_conf) => {
info!(
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
"Starting secondary tenant"
);
TenantSlot::Secondary(SecondaryTenant::new(
tenant_shard_id,
shard_identity,
location_conf.tenant_conf,
&secondary_conf,
))
}
};
tenants.insert(tenant_shard_id, slot);

View File

@@ -312,7 +312,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
(detail.last_download, detail.next_download.unwrap())
};
if now < next_download {
if now > next_download {
Some(PendingDownload {
secondary_state: secondary_tenant,
last_download,
@@ -647,6 +647,12 @@ impl<'a> TenantDownloader<'a> {
progress.bytes_downloaded += layer_byte_count;
progress.layers_downloaded += layer_count;
}
for delete_timeline in &delete_timelines {
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
// from disk fails that will be a fatal error.
detail.timelines.remove(delete_timeline);
}
}
// Execute accumulated deletions
@@ -710,13 +716,14 @@ impl<'a> TenantDownloader<'a> {
.await
.map_err(UpdateError::from)?;
SECONDARY_MODE.download_heatmap.inc();
if Some(&download.etag) == prev_etag {
Ok(HeatMapDownload::Unmodified)
} else {
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
SECONDARY_MODE.download_heatmap.inc();
Ok(HeatMapDownload::Modified(HeatMapModified {
etag: download.etag,
last_modified: download.last_modified,

View File

@@ -20,8 +20,8 @@
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
//! ```
//!
//! Every delta file consists of three parts: "summary", "index", and
//! "values". The summary is a fixed size header at the beginning of the file,
//! Every delta file consists of three parts: "summary", "values", and
//! "index". The summary is a fixed size header at the beginning of the file,
//! and it contains basic information about the layer, and offsets to the other
//! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
//! "values" part. The actual page images and WAL records are stored in the
@@ -863,7 +863,7 @@ impl DeltaLayerInner {
.into(),
);
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let data_end_offset = self.index_start_offset();
let reads = Self::plan_reads(
keyspace,
@@ -939,7 +939,7 @@ impl DeltaLayerInner {
}
if !range_end_handled {
tracing::info!("Handling range end fallback at {}", data_end_offset);
tracing::debug!("Handling range end fallback at {}", data_end_offset);
planner.handle_range_end(data_end_offset);
}
}
@@ -1103,11 +1103,195 @@ impl DeltaLayerInner {
if let Some(last) = all_keys.last_mut() {
// Last key occupies all space till end of value storage,
// which corresponds to beginning of the index
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
last.size = self.index_start_offset() - last.size;
}
Ok(all_keys)
}
/// Using the given writer, write out a truncated version, where LSNs higher than the
/// truncate_at are missing.
#[cfg(test)]
pub(super) async fn copy_prefix(
&self,
writer: &mut DeltaLayerWriter,
truncate_at: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
};
use futures::stream::TryStreamExt;
#[derive(Debug)]
enum Item {
Actual(Key, Lsn, BlobRef),
Sentinel,
}
impl From<Item> for Option<(Key, Lsn, BlobRef)> {
fn from(value: Item) -> Self {
match value {
Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
Item::Sentinel => None,
}
}
}
impl Item {
fn offset(&self) -> Option<BlobRef> {
match self {
Item::Actual(_, _, blob) => Some(*blob),
Item::Sentinel => None,
}
}
fn is_last(&self) -> bool {
matches!(self, Item::Sentinel)
}
}
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
// put in a sentinel value for getting the end offset for last item, and not having to
// repeat the whole read part
let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
Item::Sentinel,
))));
let mut stream = std::pin::pin!(stream);
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
let mut read_builder: Option<VectoredReadBuilder> = None;
let max_read_size = self
.max_vectored_read_bytes
.map(|x| x.0.get())
.unwrap_or(8192);
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
while let Some(item) = stream.try_next().await? {
tracing::debug!(?item, "popped");
let offset = item
.offset()
.unwrap_or(BlobRef::new(self.index_start_offset(), false));
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset))
} else {
None
};
let is_last = item.is_last();
prev = Option::from(item);
let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
let builder = if let Some((meta, offsets)) = actionable {
// extend or create a new builder
if read_builder
.as_mut()
.map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
.unwrap_or(VectoredReadExtended::No)
== VectoredReadExtended::Yes
{
None
} else {
read_builder.replace(VectoredReadBuilder::new(
offsets.start.pos(),
offsets.end.pos(),
meta,
max_read_size,
))
}
} else {
// nothing to do, except perhaps flush any existing for the last element
None
};
// flush the possible older builder and also the new one if the item was the last one
let builders = builder.into_iter();
let builders = if is_last {
builders.chain(read_builder.take())
} else {
builders.chain(None)
};
for builder in builders {
let read = builder.build();
let reader = VectoredBlobReader::new(&self.file);
let mut buf = buffer.take().unwrap();
buf.clear();
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf).await?;
for blob in res.blobs {
let key = blob.meta.key;
let lsn = blob.meta.lsn;
let data = &res.buf[blob.start..blob.end];
#[cfg(debug_assertions)]
Value::des(data)
.with_context(|| {
format!(
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
blob.meta.key,
blob.meta.lsn,
blob.start,
blob.end,
utils::Hex(data)
)
})
.unwrap();
// is it an image or will_init walrecord?
// FIXME: this could be handled by threading the BlobRef to the
// VectoredReadBuilder
let will_init = crate::repository::ValueBytes::will_init(data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);
per_blob_copy.clear();
per_blob_copy.extend_from_slice(data);
let (tmp, res) = writer
.put_value_bytes(key, lsn, std::mem::take(&mut per_blob_copy), will_init)
.await;
per_blob_copy = tmp;
res?;
}
buffer = Some(res.buf);
}
}
assert!(
read_builder.is_none(),
"with the sentinel above loop should had handled all"
);
Ok(())
}
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
println!(
"index_start_blk: {}, root {}",
@@ -1177,6 +1361,44 @@ impl DeltaLayerInner {
Ok(())
}
#[cfg(test)]
fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
start: &'a [u8; DELTA_KEY_SIZE],
ctx: &'a RequestContext,
) -> impl futures::stream::Stream<
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
> + 'a
where
R: BlockReader,
{
use futures::stream::TryStreamExt;
let stream = reader.get_stream_from(start, ctx);
stream.map_ok(|(key, value)| {
let key = DeltaKey::from_slice(&key);
let (key, lsn) = (key.key(), key.lsn());
let offset = BlobRef(value);
(key, lsn, offset)
})
}
/// The file offset to the first block of index.
///
/// The file structure is summary, values, and index. We often need this for the size of last blob.
fn index_start_offset(&self) -> u64 {
let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let bref = BlobRef(offset);
tracing::debug!(
index_start_blk = self.index_start_blk,
offset,
pos = bref.pos(),
"index_start_offset"
);
offset
}
}
/// A set of data associated with a delta layer key and its value
@@ -1538,7 +1760,7 @@ mod test {
let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
let inner = resident.get_inner_delta(&ctx).await?;
let inner = resident.as_delta(&ctx).await?;
let file_size = inner.file.metadata().await?.len();
tracing::info!(
@@ -1594,4 +1816,217 @@ mod test {
Ok(())
}
#[tokio::test]
async fn copy_delta_prefix_smoke() {
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
let (tenant, ctx) = h.load().await;
let ctx = &ctx;
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
.await
.unwrap();
let initdb_layer = timeline
.layers
.read()
.await
.likely_resident_layers()
.next()
.unwrap();
{
let mut writer = timeline.writer().await;
let data = [
(0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
(
0x30,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: false,
rec: Bytes::from_static(b"1"),
}),
),
(
0x40,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: Bytes::from_static(b"2"),
}),
),
// build an oversized value so we cannot extend and existing read over
// this
(
0x50,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: {
let mut buf =
vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
buf.iter_mut()
.enumerate()
.for_each(|(i, slot)| *slot = (i % 256) as u8);
Bytes::from(buf)
},
}),
),
// because the oversized read cannot be extended further, we are sure to exercise the
// builder created on the last round with this:
(
0x60,
12,
Value::WalRecord(NeonWalRecord::Postgres {
will_init: true,
rec: Bytes::from_static(b"3"),
}),
),
(
0x60,
9,
Value::Image(Bytes::from_static(b"something for a different key")),
),
];
let mut last_lsn = None;
for (lsn, key, value) in data {
let key = Key::from_i128(key);
writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
last_lsn = Some(lsn);
}
writer.finish_write(Lsn(last_lsn.unwrap()));
}
timeline.freeze_and_flush().await.unwrap();
let new_layer = timeline
.layers
.read()
.await
.likely_resident_layers()
.find(|x| x != &initdb_layer)
.unwrap();
// create a copy for the timeline, so we don't overwrite the file
let branch = tenant
.branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
.await
.unwrap();
assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
// truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
// a single key
for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
let truncate_at = Lsn(truncate_at);
let mut writer = DeltaLayerWriter::new(
tenant.conf,
branch.timeline_id,
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
)
.await
.unwrap();
let new_layer = new_layer.download_and_keep_resident().await.unwrap();
new_layer
.copy_delta_prefix(&mut writer, truncate_at, ctx)
.await
.unwrap();
let copied_layer = writer.finish(Key::MAX, &branch).await.unwrap();
copied_layer.as_delta(ctx).await.unwrap();
assert_keys_and_values_eq(
new_layer.as_delta(ctx).await.unwrap(),
copied_layer.as_delta(ctx).await.unwrap(),
truncate_at,
ctx,
)
.await;
}
}
async fn assert_keys_and_values_eq(
source: &DeltaLayerInner,
truncated: &DeltaLayerInner,
truncated_at: Lsn,
ctx: &RequestContext,
) {
use futures::future::ready;
use futures::stream::TryStreamExt;
let start_key = [0u8; DELTA_KEY_SIZE];
let source_reader = FileBlockReader::new(&source.file, source.file_id);
let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
source.index_start_blk,
source.index_root_blk,
&source_reader,
);
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
let source_stream = source_stream.filter(|res| match res {
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
_ => ready(true),
});
let mut source_stream = std::pin::pin!(source_stream);
let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
truncated.index_start_blk,
truncated.index_root_blk,
&truncated_reader,
);
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
let mut truncated_stream = std::pin::pin!(truncated_stream);
let mut scratch_left = Vec::new();
let mut scratch_right = Vec::new();
loop {
let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
if src.is_none() {
assert!(truncated.is_none());
break;
}
let (src, truncated) = (src.unwrap(), truncated.unwrap());
// because we've filtered the source with Lsn, we should always have the same keys from both.
assert_eq!(src.0, truncated.0);
assert_eq!(src.1, truncated.1);
// if this is needed for something else, just drop this assert.
assert!(
src.2.pos() >= truncated.2.pos(),
"value position should not go backwards {} vs. {}",
src.2.pos(),
truncated.2.pos()
);
scratch_left.clear();
let src_cursor = source_reader.block_cursor();
let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
scratch_right.clear();
let trunc_cursor = truncated_reader.block_cursor();
let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
tokio::try_join!(left, right).unwrap();
assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
}
}
}

View File

@@ -116,6 +116,12 @@ impl AsLayerDesc for Layer {
}
}
impl PartialEq for Layer {
fn eq(&self, other: &Self) -> bool {
Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
}
}
impl Layer {
/// Creates a layer value for a file we know to not be resident.
pub(crate) fn for_evicted(
@@ -1752,6 +1758,28 @@ impl ResidentLayer {
}
}
/// FIXME: truncate is bad name because we are not truncating anything, but copying the
/// filtered parts.
#[cfg(test)]
pub(super) async fn copy_delta_prefix(
&self,
writer: &mut super::delta_layer::DeltaLayerWriter,
truncate_at: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use LayerKind::*;
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => d
.copy_prefix(writer, truncate_at, ctx)
.await
.with_context(|| format!("truncate {self}")),
Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
}
}
pub(crate) fn local_path(&self) -> &Utf8Path {
&self.owner.0.path
}
@@ -1761,14 +1789,14 @@ impl ResidentLayer {
}
#[cfg(test)]
pub(crate) async fn get_inner_delta<'a>(
&'a self,
pub(crate) async fn as_delta(
&self,
ctx: &RequestContext,
) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
LayerKind::Delta(d) => Ok(d),
LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
use LayerKind::*;
match self.downloaded.get(&self.owner.0, ctx).await? {
Delta(ref d) => Ok(d),
Image(_) => Err(anyhow::anyhow!("image layer")),
}
}
}

View File

@@ -61,18 +61,18 @@ pub struct VectoredRead {
}
impl VectoredRead {
pub fn size(&self) -> usize {
pub(crate) fn size(&self) -> usize {
(self.end - self.start) as usize
}
}
#[derive(Eq, PartialEq)]
enum VectoredReadExtended {
pub(crate) enum VectoredReadExtended {
Yes,
No,
}
struct VectoredReadBuilder {
pub(crate) struct VectoredReadBuilder {
start: u64,
end: u64,
blobs_at: VecMap<u64, BlobMeta>,
@@ -80,7 +80,17 @@ struct VectoredReadBuilder {
}
impl VectoredReadBuilder {
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
/// Start building a new vectored read.
///
/// Note that by design, this does not check against reading more than `max_read_size` to
/// support reading larger blobs than the configuration value. The builder will be single use
/// however after that.
pub(crate) fn new(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: usize,
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
@@ -97,7 +107,8 @@ impl VectoredReadBuilder {
/// Attempt to extend the current read with a new blob if the start
/// offset matches with the current end of the vectored read
/// and the resuting size is below the max read size
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
tracing::trace!(start, end, "trying to extend");
let size = (end - start) as usize;
if self.end == start && self.size() + size <= self.max_read_size {
self.end = end;
@@ -111,11 +122,11 @@ impl VectoredReadBuilder {
VectoredReadExtended::No
}
fn size(&self) -> usize {
pub(crate) fn size(&self) -> usize {
(self.end - self.start) as usize
}
fn build(self) -> VectoredRead {
pub(crate) fn build(self) -> VectoredRead {
VectoredRead {
start: self.start,
end: self.end,

View File

@@ -55,6 +55,7 @@ impl NeonWalRecord {
/// Does replaying this WAL record initialize the page from scratch, or does
/// it need to be applied over the previous image of the page?
pub fn will_init(&self) -> bool {
// If you change this function, you'll also need to change ValueBytes::will_init
match self {
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,

View File

@@ -331,7 +331,6 @@ async fn main() -> anyhow::Result<()> {
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
let cancel_map = CancelMap::default();
let redis_publisher = match &regional_redis_client {
@@ -357,7 +356,6 @@ async fn main() -> anyhow::Result<()> {
config,
proxy_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
@@ -372,7 +370,6 @@ async fn main() -> anyhow::Result<()> {
config,
serverless_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
}
@@ -533,7 +530,11 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let api = console::provider::neon::Api::new(endpoint, caches, locks);
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(endpoint_rps_limit));
let api =
console::provider::neon::Api::new(endpoint, caches, locks, endpoint_rate_limiter);
let api = console::provider::ConsoleBackend::Console(api);
auth::BackendType::Console(MaybeOwned::Owned(api), ())
}
@@ -567,8 +568,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let mut redis_rps_limit = args.redis_rps_limit.clone();
RateBucketInfo::validate(&mut redis_rps_limit)?;
@@ -581,7 +580,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
authentication_config,
require_client_ip: args.require_client_ip,
disable_ip_check_for_http: args.disable_ip_check_for_http,
endpoint_rps_limit,
redis_rps_limit,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),

View File

@@ -70,20 +70,14 @@ impl EndpointsCache {
if !self.ready.load(Ordering::Acquire) {
return true;
}
// If cache is disabled, just collect the metrics and return.
if self.config.disable_cache {
let rejected = self.should_reject(endpoint);
ctx.set_rejected(rejected);
info!(?rejected, "check endpoint is valid, disabled cache");
return true;
}
// If the limiter allows, we don't need to check the cache.
if self.limiter.lock().await.check() {
return true;
}
let rejected = self.should_reject(endpoint);
info!(?rejected, "check endpoint is valid, enabled cache");
ctx.set_rejected(rejected);
info!(?rejected, "check endpoint is valid, disabled cache");
// If cache is disabled, just collect the metrics and return or
// If the limiter allows, we don't need to check the cache.
if self.config.disable_cache || self.limiter.lock().await.check() {
return true;
}
!rejected
}
fn should_reject(&self, endpoint: &EndpointId) -> bool {

View File

@@ -29,7 +29,6 @@ pub struct ProxyConfig {
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,
pub disable_ip_check_for_http: bool,
pub endpoint_rps_limit: Vec<RateBucketInfo>,
pub redis_rps_limit: Vec<RateBucketInfo>,
pub region: String,
pub handshake_timeout: Duration,

View File

@@ -208,6 +208,9 @@ pub mod errors {
#[error(transparent)]
ApiError(ApiError),
#[error("Too many connections attempts")]
TooManyConnections,
#[error("Timeout waiting to acquire wake compute lock")]
TimeoutError,
}
@@ -240,6 +243,8 @@ pub mod errors {
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
TooManyConnections => self.to_string(),
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
}
}
@@ -250,6 +255,7 @@ pub mod errors {
match self {
WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
WakeComputeError::ApiError(e) => e.get_error_kind(),
WakeComputeError::TooManyConnections => crate::error::ErrorKind::RateLimit,
WakeComputeError::TimeoutError => crate::error::ErrorKind::ServiceRateLimit,
}
}

View File

@@ -12,6 +12,7 @@ use crate::{
console::messages::ColdStartInfo,
http,
metrics::{CacheOutcome, Metrics},
rate_limiter::EndpointRateLimiter,
scram, Normalize,
};
use crate::{cache::Cached, context::RequestMonitoring};
@@ -25,6 +26,7 @@ pub struct Api {
endpoint: http::Endpoint,
pub caches: &'static ApiCaches,
pub locks: &'static ApiLocks,
pub endpoint_rate_limiter: Arc<EndpointRateLimiter>,
jwt: String,
}
@@ -34,6 +36,7 @@ impl Api {
endpoint: http::Endpoint,
caches: &'static ApiCaches,
locks: &'static ApiLocks,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> Self {
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
Ok(v) => v,
@@ -43,6 +46,7 @@ impl Api {
endpoint,
caches,
locks,
endpoint_rate_limiter,
jwt,
}
}
@@ -277,6 +281,14 @@ impl super::Api for Api {
return Ok(cached);
}
// check rate limit
if !self
.endpoint_rate_limiter
.check(user_info.endpoint.normalize().into(), 1)
{
return Err(WakeComputeError::TooManyConnections);
}
let permit = self.locks.get_wake_compute_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled

View File

@@ -51,7 +51,7 @@ pub struct RequestMonitoring {
sender: Option<mpsc::UnboundedSender<RequestData>>,
pub latency_timer: LatencyTimer,
// Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
rejected: bool,
rejected: Option<bool>,
}
#[derive(Clone, Debug)]
@@ -96,7 +96,7 @@ impl RequestMonitoring {
error_kind: None,
auth_method: None,
success: false,
rejected: false,
rejected: None,
cold_start_info: ColdStartInfo::Unknown,
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
@@ -118,7 +118,7 @@ impl RequestMonitoring {
}
pub fn set_rejected(&mut self, rejected: bool) {
self.rejected = rejected;
self.rejected = Some(rejected);
}
pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
@@ -200,27 +200,28 @@ impl Drop for RequestMonitoring {
} else {
ConnectOutcome::Failed
};
let rejected = self.rejected;
let ep = self
.endpoint_id
.as_ref()
.map(|x| x.as_str())
.unwrap_or_default();
// This makes sense only if cache is disabled
info!(
?ep,
?outcome,
?rejected,
"check endpoint is valid with outcome"
);
Metrics::get()
.proxy
.invalid_endpoints_total
.inc(InvalidEndpointsGroup {
protocol: self.protocol,
rejected: rejected.into(),
outcome,
});
if let Some(rejected) = self.rejected {
let ep = self
.endpoint_id
.as_ref()
.map(|x| x.as_str())
.unwrap_or_default();
// This makes sense only if cache is disabled
info!(
?outcome,
?rejected,
?ep,
"check endpoint is valid with outcome"
);
Metrics::get()
.proxy
.invalid_endpoints_total
.inc(InvalidEndpointsGroup {
protocol: self.protocol,
rejected: rejected.into(),
outcome,
});
}
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}

View File

@@ -19,9 +19,8 @@ use crate::{
metrics::{Metrics, NumClientConnectionsGuard},
protocol2::WithClientIp,
proxy::handshake::{handshake, HandshakeData},
rate_limiter::EndpointRateLimiter,
stream::{PqStream, Stream},
EndpointCacheKey, Normalize,
EndpointCacheKey,
};
use futures::TryFutureExt;
use itertools::Itertools;
@@ -61,7 +60,6 @@ pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
scopeguard::defer! {
@@ -86,7 +84,6 @@ pub async fn task_main(
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
@@ -128,7 +125,6 @@ pub async fn task_main(
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter,
conn_gauge,
)
.instrument(span.clone())
@@ -242,7 +238,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
cancellation_handler: Arc<CancellationHandlerMain>,
stream: S,
mode: ClientMode,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conn_gauge: NumClientConnectionsGuard<'static>,
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
info!(
@@ -288,15 +283,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
Err(e) => stream.throw_error(e).await?,
};
// check rate limit
if let Some(ep) = user_info.get_endpoint() {
if !endpoint_rate_limiter.check(ep.normalize(), 1) {
return stream
.throw_error(auth::AuthError::too_many_connections())
.await?;
}
}
let user = user_info.get_user().to_owned();
let user_info = match user_info
.authenticate(

View File

@@ -90,6 +90,7 @@ fn report_error(e: &WakeComputeError, retry: bool) {
WakeComputeError::ApiError(ApiError::Console { .. }) => {
WakeupFailureKind::ApiConsoleOtherError
}
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TimeoutError => WakeupFailureKind::TimeoutError,
};
Metrics::get()

View File

@@ -15,7 +15,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
use tokio::time::{Duration, Instant};
use tracing::info;
use crate::EndpointId;
use crate::intern::EndpointIdInt;
pub struct GlobalRateLimiter {
data: Vec<RateBucket>,
@@ -61,12 +61,7 @@ impl GlobalRateLimiter {
// Purposefully ignore user name and database name as clients can reconnect
// with different names, so we'll end up sending some http requests to
// the control plane.
//
// We also may save quite a lot of CPU (I think) by bailing out right after we
// saw SNI, before doing TLS handshake. User-side error messages in that case
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
// I went with a more expensive way that yields user-friendlier error messages.
pub type EndpointRateLimiter = BucketRateLimiter<EndpointId, StdRng, RandomState>;
pub type EndpointRateLimiter = BucketRateLimiter<EndpointIdInt, StdRng, RandomState>;
pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
map: DashMap<Key, Vec<RateBucket>, Hasher>,
@@ -245,7 +240,7 @@ mod tests {
use tokio::time;
use super::{BucketRateLimiter, EndpointRateLimiter};
use crate::{rate_limiter::RateBucketInfo, EndpointId};
use crate::{intern::EndpointIdInt, rate_limiter::RateBucketInfo, EndpointId};
#[test]
fn rate_bucket_rpi() {
@@ -295,39 +290,40 @@ mod tests {
let limiter = EndpointRateLimiter::new(rates);
let endpoint = EndpointId::from("ep-my-endpoint-1234");
let endpoint = EndpointIdInt::from(endpoint);
time::pause();
for _ in 0..100 {
assert!(limiter.check(endpoint.clone(), 1));
assert!(limiter.check(endpoint, 1));
}
// more connections fail
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// fail even after 500ms as it's in the same bucket
time::advance(time::Duration::from_millis(500)).await;
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// after a full 1s, 100 requests are allowed again
time::advance(time::Duration::from_millis(500)).await;
for _ in 1..6 {
for _ in 0..50 {
assert!(limiter.check(endpoint.clone(), 2));
assert!(limiter.check(endpoint, 2));
}
time::advance(time::Duration::from_millis(1000)).await;
}
// more connections after 600 will exceed the 20rps@30s limit
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// will still fail before the 30 second limit
time::advance(time::Duration::from_millis(30_000 - 6_000 - 1)).await;
assert!(!limiter.check(endpoint.clone(), 1));
assert!(!limiter.check(endpoint, 1));
// after the full 30 seconds, 100 requests are allowed again
time::advance(time::Duration::from_millis(1)).await;
for _ in 0..100 {
assert!(limiter.check(endpoint.clone(), 1));
assert!(limiter.check(endpoint, 1));
}
}

View File

@@ -35,7 +35,6 @@ use crate::context::RequestMonitoring;
use crate::metrics::Metrics;
use crate::protocol2::WithClientIp;
use crate::proxy::run_until_cancelled;
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
@@ -53,7 +52,6 @@ pub async fn task_main(
config: &'static ProxyConfig,
ws_listener: TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandlerMain>,
) -> anyhow::Result<()> {
scopeguard::defer! {
@@ -117,7 +115,6 @@ pub async fn task_main(
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
endpoint_rate_limiter.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
@@ -147,7 +144,6 @@ async fn connection_handler(
backend: Arc<PoolingBackend>,
connections: TaskTracker,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_token: CancellationToken,
server: Builder<TokioExecutor>,
tls_acceptor: TlsAcceptor,
@@ -231,7 +227,6 @@ async fn connection_handler(
cancellation_handler.clone(),
session_id,
peer_addr,
endpoint_rate_limiter.clone(),
http_request_token,
)
.in_current_span()
@@ -270,7 +265,6 @@ async fn request_handler(
cancellation_handler: Arc<CancellationHandlerMain>,
session_id: uuid::Uuid,
peer_addr: IpAddr,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
// used to cancel in-flight HTTP requests. not used to cancel websockets
http_cancellation_token: CancellationToken,
) -> Result<Response<Full<Bytes>>, ApiError> {
@@ -298,15 +292,9 @@ async fn request_handler(
ws_connections.spawn(
async move {
if let Err(e) = websocket::serve_websocket(
config,
ctx,
websocket,
cancellation_handler,
host,
endpoint_rate_limiter,
)
.await
if let Err(e) =
websocket::serve_websocket(config, ctx, websocket, cancellation_handler, host)
.await
{
error!("error in websocket connection: {e:#}");
}

View File

@@ -5,7 +5,6 @@ use crate::{
error::{io_error, ReportableError},
metrics::Metrics,
proxy::{handle_client, ClientMode},
rate_limiter::EndpointRateLimiter,
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream};
@@ -136,7 +135,6 @@ pub async fn serve_websocket(
websocket: HyperWebsocket,
cancellation_handler: Arc<CancellationHandlerMain>,
hostname: Option<String>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
let websocket = websocket.await?;
let conn_gauge = Metrics::get()
@@ -150,7 +148,6 @@ pub async fn serve_websocket(
cancellation_handler,
WebSocketRw::new(websocket),
ClientMode::Websockets { hostname },
endpoint_rate_limiter,
conn_gauge,
)
.await;

View File

@@ -1,6 +1,7 @@
import json
import os
import random
import time
from pathlib import Path
from typing import Any, Dict, Optional
@@ -582,6 +583,91 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
)
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
"""
Slow test that runs in realtime, checks that the background scheduling of secondary
downloads happens as expected.
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
# Create this many tenants, each with two timelines
tenant_count = 4
tenant_timelines = {}
# This mirrors a constant in `downloader.rs`
freshen_interval_secs = 60
for _i in range(0, tenant_count):
tenant_id = TenantId.generate()
timeline_a = TimelineId.generate()
timeline_b = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id,
timeline_a,
placement_policy='{"Attached":1}',
# Run with a low heatmap period so that we can avoid having to do synthetic API calls
# to trigger the upload promptly.
conf={"heatmap_period": "1s"},
)
env.neon_cli.create_timeline("main2", tenant_id, timeline_b)
tenant_timelines[tenant_id] = [timeline_a, timeline_b]
t_start = time.time()
# Wait long enough that the background downloads should happen; we expect all the inital layers
# of all the initial timelines to show up on the secondary location of each tenant.
time.sleep(freshen_interval_secs * 1.5)
for tenant_id, timelines in tenant_timelines.items():
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
# We only have two: the other one must be secondary
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
for timeline_id in timelines:
log.info(f"Checking for secondary timeline {timeline_id} on node {ps_secondary.id}")
# One or more layers should be present for all timelines
assert list_layers(ps_secondary, tenant_id, timeline_id)
# Delete the second timeline: this should be reflected later on the secondary
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timelines[1])
# Wait long enough for the secondary locations to see the deletion
time.sleep(freshen_interval_secs * 1.5)
for tenant_id, timelines in tenant_timelines.items():
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
ps_attached = env.get_pageserver(attached_to_id)
# We only have two: the other one must be secondary
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
# This one was not deleted
assert list_layers(ps_secondary, tenant_id, timelines[0])
# This one was deleted
assert not list_layers(ps_secondary, tenant_id, timelines[1])
t_end = time.time()
# Measure how many heatmap downloads we did in total: this checks that we succeeded with
# proper scheduling, and not some bug that just runs downloads in a loop.
total_heatmap_downloads = 0
for ps in env.pageservers:
v = ps.http_client().get_metric_value("pageserver_secondary_download_heatmap_total")
assert v is not None
total_heatmap_downloads += int(v)
download_rate = (total_heatmap_downloads / tenant_count) / (t_end - t_start)
expect_download_rate = 1.0 / freshen_interval_secs
log.info(f"Download rate: {download_rate * 60}/min vs expected {expect_download_rate * 60}/min")
assert download_rate < expect_download_rate * 2
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
@pytest.mark.parametrize("via_controller", [True, False])
def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controller: bool):