mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-24 05:40:36 +00:00
Compare commits
8 Commits
proxy-impr
...
proxy_ip_a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81ca0d0881 | ||
|
|
637ad4a638 | ||
|
|
8d0f701767 | ||
|
|
5191f6ef0e | ||
|
|
a54ea8fb1c | ||
|
|
d5708e7435 | ||
|
|
fd49005cb3 | ||
|
|
3023de156e |
62
docs/rfcs/033-proxy-generalized-ip-allowlists.md
Normal file
62
docs/rfcs/033-proxy-generalized-ip-allowlists.md
Normal file
@@ -0,0 +1,62 @@
|
||||
# Name
|
||||
Created on 2024.04.16
|
||||
|
||||
Implemented on -
|
||||
|
||||
## Summary
|
||||
|
||||
Neon allows tenants with higher security needs to allowlist IP addresses for access to their database. However, in some cases it is inconvenient because the access comes from ephemeral IP addresses (e.g., from a user's browser). Thus we are searching for a way which maintains security while also making the usage less difficult when IP allowlists are used.
|
||||
|
||||
## Motivation
|
||||
|
||||
When IP allowlisting is enabled only connections from the listed IP addresses are allowed. Every other incoming connection is rejected. This is increasing security when the connection is originating from another service run by the tenants which always uses a limited range of IP addresses. However there are a couple of use cases where this is not true:
|
||||
- **Console SQL editor**: The connection is orginating from a user's browser.
|
||||
- **Schema-diff**: The connection is orginating from a user's browser (*link issue*).
|
||||
- **Time-travel**:
|
||||
- **Proxy-link**: (*link issue*)
|
||||
|
||||
In those cases a user currently needs to add the originating IP address to the IP allowlist. This requires manually obtaining the own IP address and adding it to the IP allowlist (*I believe there is a button for that right now, correct?*). Also once an IP address is allowlisted it will in many cases stay in the list because people may forget to delete it after they do not use it anymore. There is also the danger that they accidentally delete the wrong IP address when they are done.
|
||||
|
||||
Also we should think about RLS Auth which probably will might use the same approach.
|
||||
|
||||
## Non Goals (if relevant)
|
||||
|
||||
## Impacted components (e.g. pageserver, safekeeper, console, etc)
|
||||
|
||||
Proxy, console, control plane (*anything else?)
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
If possible we would like to create a common implementation for the mentioned cases. However they might have different security requirements: Obtaining a database schema is less security sensitive than reading or even writing data to and from the database. Below are the different alternatives we consider:
|
||||
|
||||
#### JWT Tokens
|
||||
|
||||
A [JWT token](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-token-structure) is used as a means to authenticate the user to the proxy. The token can hold additional fields (*how are they called?*). One such field would give an IP address and allow the proxy to admit the access even if the IP address is not allowlisted. For example in the case of the console SQL editor a JWT token would be created by the console and sent to the proxy for authentication.
|
||||
|
||||
#### Restricting allowed query types for not allowlisted IP addresses
|
||||
|
||||
We could solve the issue for schema-diff by allowing only the schema queries if the IP address is not allowlisted. This would solve the schema-diff problem but no other.
|
||||
|
||||
#### Ephemeral nodes have a setting to ignore IP allowlist
|
||||
|
||||
For at least time-travel we use ephemeral compute nodes. Those could be restricted to be read-only
|
||||
|
||||
### Reliability, failure modes and corner cases (if relevant)
|
||||
|
||||
### Interaction/Sequence diagram (if relevant)
|
||||
|
||||
### Scalability (if relevant)
|
||||
|
||||
### Security implications (if relevant)
|
||||
- We are logging when an IP allowlist is added: we should also log when a token is created
|
||||
- To avoid replay attacks a JWT token should be bound to the IP address
|
||||
|
||||
### Unresolved questions (if relevant)
|
||||
|
||||
- In SQL editor how is authentciation working right now?
|
||||
|
||||
## Alternative implementation (if relevant)
|
||||
|
||||
## Pros/cons of proposed approaches (if relevant)
|
||||
|
||||
## Definition of Done (if relevant)
|
||||
@@ -1518,7 +1518,8 @@ pub(crate) struct SecondaryModeMetrics {
|
||||
pub(crate) download_heatmap: IntCounter,
|
||||
pub(crate) download_layer: IntCounter,
|
||||
}
|
||||
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| SecondaryModeMetrics {
|
||||
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| {
|
||||
SecondaryModeMetrics {
|
||||
upload_heatmap: register_int_counter!(
|
||||
"pageserver_secondary_upload_heatmap",
|
||||
"Number of heatmaps written to remote storage by attached tenants"
|
||||
@@ -1536,7 +1537,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
|
||||
.expect("failed to define a metric"),
|
||||
download_heatmap: register_int_counter!(
|
||||
"pageserver_secondary_download_heatmap",
|
||||
"Number of downloads of heatmaps by secondary mode locations"
|
||||
"Number of downloads of heatmaps by secondary mode locations, including when it hasn't changed"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
download_layer: register_int_counter!(
|
||||
@@ -1544,6 +1545,7 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
|
||||
"Number of downloads of layers by secondary mode locations"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
}
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
|
||||
@@ -33,6 +33,52 @@ impl Value {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub(crate) enum InvalidInput {
|
||||
TooShortValue,
|
||||
TooShortPostgresRecord,
|
||||
}
|
||||
|
||||
/// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
|
||||
/// use this type for querying if a slice looks some particular way.
|
||||
#[cfg(test)]
|
||||
pub(crate) struct ValueBytes;
|
||||
|
||||
#[cfg(test)]
|
||||
impl ValueBytes {
|
||||
pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
|
||||
if raw.len() < 12 {
|
||||
return Err(InvalidInput::TooShortValue);
|
||||
}
|
||||
|
||||
let value_discriminator = &raw[0..4];
|
||||
|
||||
if value_discriminator == [0, 0, 0, 0] {
|
||||
// Value::Image always initializes
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
if value_discriminator != [0, 0, 0, 1] {
|
||||
// not a Value::WalRecord(..)
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let walrecord_discriminator = &raw[4..8];
|
||||
|
||||
if walrecord_discriminator != [0, 0, 0, 0] {
|
||||
// only NeonWalRecord::Postgres can have will_init
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if raw.len() < 17 {
|
||||
return Err(InvalidInput::TooShortPostgresRecord);
|
||||
}
|
||||
|
||||
Ok(raw[8] == 1)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
@@ -70,6 +116,8 @@ mod test {
|
||||
];
|
||||
|
||||
roundtrip!(image, expected);
|
||||
|
||||
assert!(ValueBytes::will_init(&expected).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -93,6 +141,96 @@ mod test {
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(ValueBytes::will_init(&expected).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_inspection_too_short_image() {
|
||||
let rec = Value::Image(Bytes::from_static(b""));
|
||||
|
||||
#[rustfmt::skip]
|
||||
let expected = [
|
||||
// top level discriminator of 4 bytes
|
||||
0x00, 0x00, 0x00, 0x00,
|
||||
// 8 byte length
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(ValueBytes::will_init(&expected).unwrap());
|
||||
assert_eq!(expected.len(), 12);
|
||||
for len in 0..12 {
|
||||
assert_eq!(
|
||||
ValueBytes::will_init(&expected[..len]).unwrap_err(),
|
||||
InvalidInput::TooShortValue
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_inspection_too_short_postgres_record() {
|
||||
let rec = NeonWalRecord::Postgres {
|
||||
will_init: false,
|
||||
rec: Bytes::from_static(b""),
|
||||
};
|
||||
let rec = Value::WalRecord(rec);
|
||||
|
||||
#[rustfmt::skip]
|
||||
let expected = [
|
||||
// flattened discriminator of total 8 bytes
|
||||
0x00, 0x00, 0x00, 0x01,
|
||||
0x00, 0x00, 0x00, 0x00,
|
||||
// will_init
|
||||
0x00,
|
||||
// 8 byte length
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(!ValueBytes::will_init(&expected).unwrap());
|
||||
assert_eq!(expected.len(), 17);
|
||||
for len in 12..17 {
|
||||
assert_eq!(
|
||||
ValueBytes::will_init(&expected[..len]).unwrap_err(),
|
||||
InvalidInput::TooShortPostgresRecord
|
||||
)
|
||||
}
|
||||
for len in 0..12 {
|
||||
assert_eq!(
|
||||
ValueBytes::will_init(&expected[..len]).unwrap_err(),
|
||||
InvalidInput::TooShortValue
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_visibility_map_flags_example() {
|
||||
let rec = NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: Some(0x11),
|
||||
old_heap_blkno: None,
|
||||
flags: 0x03,
|
||||
};
|
||||
let rec = Value::WalRecord(rec);
|
||||
|
||||
#[rustfmt::skip]
|
||||
let expected = [
|
||||
// discriminators
|
||||
0x00, 0x00, 0x00, 0x01,
|
||||
0x00, 0x00, 0x00, 0x01,
|
||||
// Some == 1 followed by 4 bytes
|
||||
0x01, 0x00, 0x00, 0x00, 0x11,
|
||||
// None == 0
|
||||
0x00,
|
||||
// flags
|
||||
0x03
|
||||
];
|
||||
|
||||
roundtrip!(rec, expected);
|
||||
|
||||
assert!(!ValueBytes::will_init(&expected).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -678,12 +678,19 @@ pub async fn init_tenant_mgr(
|
||||
}
|
||||
}
|
||||
}
|
||||
LocationMode::Secondary(secondary_conf) => TenantSlot::Secondary(SecondaryTenant::new(
|
||||
tenant_shard_id,
|
||||
shard_identity,
|
||||
location_conf.tenant_conf,
|
||||
&secondary_conf,
|
||||
)),
|
||||
LocationMode::Secondary(secondary_conf) => {
|
||||
info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
"Starting secondary tenant"
|
||||
);
|
||||
TenantSlot::Secondary(SecondaryTenant::new(
|
||||
tenant_shard_id,
|
||||
shard_identity,
|
||||
location_conf.tenant_conf,
|
||||
&secondary_conf,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
tenants.insert(tenant_shard_id, slot);
|
||||
|
||||
@@ -312,7 +312,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
(detail.last_download, detail.next_download.unwrap())
|
||||
};
|
||||
|
||||
if now < next_download {
|
||||
if now > next_download {
|
||||
Some(PendingDownload {
|
||||
secondary_state: secondary_tenant,
|
||||
last_download,
|
||||
@@ -647,6 +647,12 @@ impl<'a> TenantDownloader<'a> {
|
||||
progress.bytes_downloaded += layer_byte_count;
|
||||
progress.layers_downloaded += layer_count;
|
||||
}
|
||||
|
||||
for delete_timeline in &delete_timelines {
|
||||
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
|
||||
// from disk fails that will be a fatal error.
|
||||
detail.timelines.remove(delete_timeline);
|
||||
}
|
||||
}
|
||||
|
||||
// Execute accumulated deletions
|
||||
@@ -710,13 +716,14 @@ impl<'a> TenantDownloader<'a> {
|
||||
.await
|
||||
.map_err(UpdateError::from)?;
|
||||
|
||||
SECONDARY_MODE.download_heatmap.inc();
|
||||
|
||||
if Some(&download.etag) == prev_etag {
|
||||
Ok(HeatMapDownload::Unmodified)
|
||||
} else {
|
||||
let mut heatmap_bytes = Vec::new();
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
|
||||
SECONDARY_MODE.download_heatmap.inc();
|
||||
Ok(HeatMapDownload::Modified(HeatMapModified {
|
||||
etag: download.etag,
|
||||
last_modified: download.last_modified,
|
||||
|
||||
@@ -20,8 +20,8 @@
|
||||
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
|
||||
//! ```
|
||||
//!
|
||||
//! Every delta file consists of three parts: "summary", "index", and
|
||||
//! "values". The summary is a fixed size header at the beginning of the file,
|
||||
//! Every delta file consists of three parts: "summary", "values", and
|
||||
//! "index". The summary is a fixed size header at the beginning of the file,
|
||||
//! and it contains basic information about the layer, and offsets to the other
|
||||
//! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
|
||||
//! "values" part. The actual page images and WAL records are stored in the
|
||||
@@ -863,7 +863,7 @@ impl DeltaLayerInner {
|
||||
.into(),
|
||||
);
|
||||
|
||||
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let data_end_offset = self.index_start_offset();
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
keyspace,
|
||||
@@ -939,7 +939,7 @@ impl DeltaLayerInner {
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
tracing::info!("Handling range end fallback at {}", data_end_offset);
|
||||
tracing::debug!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
}
|
||||
}
|
||||
@@ -1103,11 +1103,195 @@ impl DeltaLayerInner {
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
// Last key occupies all space till end of value storage,
|
||||
// which corresponds to beginning of the index
|
||||
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
|
||||
last.size = self.index_start_offset() - last.size;
|
||||
}
|
||||
Ok(all_keys)
|
||||
}
|
||||
|
||||
/// Using the given writer, write out a truncated version, where LSNs higher than the
|
||||
/// truncate_at are missing.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn copy_prefix(
|
||||
&self,
|
||||
writer: &mut DeltaLayerWriter,
|
||||
truncate_at: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
|
||||
};
|
||||
use futures::stream::TryStreamExt;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Item {
|
||||
Actual(Key, Lsn, BlobRef),
|
||||
Sentinel,
|
||||
}
|
||||
|
||||
impl From<Item> for Option<(Key, Lsn, BlobRef)> {
|
||||
fn from(value: Item) -> Self {
|
||||
match value {
|
||||
Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
|
||||
Item::Sentinel => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Item {
|
||||
fn offset(&self) -> Option<BlobRef> {
|
||||
match self {
|
||||
Item::Actual(_, _, blob) => Some(*blob),
|
||||
Item::Sentinel => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_last(&self) -> bool {
|
||||
matches!(self, Item::Sentinel)
|
||||
}
|
||||
}
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
|
||||
let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
|
||||
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
|
||||
// put in a sentinel value for getting the end offset for last item, and not having to
|
||||
// repeat the whole read part
|
||||
let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
|
||||
Item::Sentinel,
|
||||
))));
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
let mut prev: Option<(Key, Lsn, BlobRef)> = None;
|
||||
|
||||
let mut read_builder: Option<VectoredReadBuilder> = None;
|
||||
|
||||
let max_read_size = self
|
||||
.max_vectored_read_bytes
|
||||
.map(|x| x.0.get())
|
||||
.unwrap_or(8192);
|
||||
|
||||
let mut buffer = Some(BytesMut::with_capacity(max_read_size));
|
||||
|
||||
// FIXME: buffering of DeltaLayerWriter
|
||||
let mut per_blob_copy = Vec::new();
|
||||
|
||||
while let Some(item) = stream.try_next().await? {
|
||||
tracing::debug!(?item, "popped");
|
||||
let offset = item
|
||||
.offset()
|
||||
.unwrap_or(BlobRef::new(self.index_start_offset(), false));
|
||||
|
||||
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
|
||||
let end_offset = offset;
|
||||
|
||||
Some((BlobMeta { key, lsn }, start_offset..end_offset))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let is_last = item.is_last();
|
||||
|
||||
prev = Option::from(item);
|
||||
|
||||
let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
|
||||
|
||||
let builder = if let Some((meta, offsets)) = actionable {
|
||||
// extend or create a new builder
|
||||
if read_builder
|
||||
.as_mut()
|
||||
.map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
|
||||
.unwrap_or(VectoredReadExtended::No)
|
||||
== VectoredReadExtended::Yes
|
||||
{
|
||||
None
|
||||
} else {
|
||||
read_builder.replace(VectoredReadBuilder::new(
|
||||
offsets.start.pos(),
|
||||
offsets.end.pos(),
|
||||
meta,
|
||||
max_read_size,
|
||||
))
|
||||
}
|
||||
} else {
|
||||
// nothing to do, except perhaps flush any existing for the last element
|
||||
None
|
||||
};
|
||||
|
||||
// flush the possible older builder and also the new one if the item was the last one
|
||||
let builders = builder.into_iter();
|
||||
let builders = if is_last {
|
||||
builders.chain(read_builder.take())
|
||||
} else {
|
||||
builders.chain(None)
|
||||
};
|
||||
|
||||
for builder in builders {
|
||||
let read = builder.build();
|
||||
|
||||
let reader = VectoredBlobReader::new(&self.file);
|
||||
|
||||
let mut buf = buffer.take().unwrap();
|
||||
|
||||
buf.clear();
|
||||
buf.reserve(read.size());
|
||||
let res = reader.read_blobs(&read, buf).await?;
|
||||
|
||||
for blob in res.blobs {
|
||||
let key = blob.meta.key;
|
||||
let lsn = blob.meta.lsn;
|
||||
let data = &res.buf[blob.start..blob.end];
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
Value::des(data)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"blob failed to deserialize for {}@{}, {}..{}: {:?}",
|
||||
blob.meta.key,
|
||||
blob.meta.lsn,
|
||||
blob.start,
|
||||
blob.end,
|
||||
utils::Hex(data)
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// is it an image or will_init walrecord?
|
||||
// FIXME: this could be handled by threading the BlobRef to the
|
||||
// VectoredReadBuilder
|
||||
let will_init = crate::repository::ValueBytes::will_init(data)
|
||||
.inspect_err(|_e| {
|
||||
#[cfg(feature = "testing")]
|
||||
tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
per_blob_copy.clear();
|
||||
per_blob_copy.extend_from_slice(data);
|
||||
|
||||
let (tmp, res) = writer
|
||||
.put_value_bytes(key, lsn, std::mem::take(&mut per_blob_copy), will_init)
|
||||
.await;
|
||||
per_blob_copy = tmp;
|
||||
res?;
|
||||
}
|
||||
|
||||
buffer = Some(res.buf);
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
read_builder.is_none(),
|
||||
"with the sentinel above loop should had handled all"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
@@ -1177,6 +1361,44 @@ impl DeltaLayerInner {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn stream_index_forwards<'a, R>(
|
||||
&'a self,
|
||||
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
|
||||
start: &'a [u8; DELTA_KEY_SIZE],
|
||||
ctx: &'a RequestContext,
|
||||
) -> impl futures::stream::Stream<
|
||||
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
|
||||
> + 'a
|
||||
where
|
||||
R: BlockReader,
|
||||
{
|
||||
use futures::stream::TryStreamExt;
|
||||
let stream = reader.get_stream_from(start, ctx);
|
||||
stream.map_ok(|(key, value)| {
|
||||
let key = DeltaKey::from_slice(&key);
|
||||
let (key, lsn) = (key.key(), key.lsn());
|
||||
let offset = BlobRef(value);
|
||||
|
||||
(key, lsn, offset)
|
||||
})
|
||||
}
|
||||
|
||||
/// The file offset to the first block of index.
|
||||
///
|
||||
/// The file structure is summary, values, and index. We often need this for the size of last blob.
|
||||
fn index_start_offset(&self) -> u64 {
|
||||
let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let bref = BlobRef(offset);
|
||||
tracing::debug!(
|
||||
index_start_blk = self.index_start_blk,
|
||||
offset,
|
||||
pos = bref.pos(),
|
||||
"index_start_offset"
|
||||
);
|
||||
offset
|
||||
}
|
||||
}
|
||||
|
||||
/// A set of data associated with a delta layer key and its value
|
||||
@@ -1538,7 +1760,7 @@ mod test {
|
||||
|
||||
let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
|
||||
|
||||
let inner = resident.get_inner_delta(&ctx).await?;
|
||||
let inner = resident.as_delta(&ctx).await?;
|
||||
|
||||
let file_size = inner.file.metadata().await?.len();
|
||||
tracing::info!(
|
||||
@@ -1594,4 +1816,217 @@ mod test {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn copy_delta_prefix_smoke() {
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use bytes::Bytes;
|
||||
|
||||
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
|
||||
let (tenant, ctx) = h.load().await;
|
||||
let ctx = &ctx;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let initdb_layer = timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.likely_resident_layers()
|
||||
.next()
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut writer = timeline.writer().await;
|
||||
|
||||
let data = [
|
||||
(0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
|
||||
(
|
||||
0x30,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: false,
|
||||
rec: Bytes::from_static(b"1"),
|
||||
}),
|
||||
),
|
||||
(
|
||||
0x40,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: true,
|
||||
rec: Bytes::from_static(b"2"),
|
||||
}),
|
||||
),
|
||||
// build an oversized value so we cannot extend and existing read over
|
||||
// this
|
||||
(
|
||||
0x50,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: true,
|
||||
rec: {
|
||||
let mut buf =
|
||||
vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
|
||||
buf.iter_mut()
|
||||
.enumerate()
|
||||
.for_each(|(i, slot)| *slot = (i % 256) as u8);
|
||||
Bytes::from(buf)
|
||||
},
|
||||
}),
|
||||
),
|
||||
// because the oversized read cannot be extended further, we are sure to exercise the
|
||||
// builder created on the last round with this:
|
||||
(
|
||||
0x60,
|
||||
12,
|
||||
Value::WalRecord(NeonWalRecord::Postgres {
|
||||
will_init: true,
|
||||
rec: Bytes::from_static(b"3"),
|
||||
}),
|
||||
),
|
||||
(
|
||||
0x60,
|
||||
9,
|
||||
Value::Image(Bytes::from_static(b"something for a different key")),
|
||||
),
|
||||
];
|
||||
|
||||
let mut last_lsn = None;
|
||||
|
||||
for (lsn, key, value) in data {
|
||||
let key = Key::from_i128(key);
|
||||
writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
|
||||
last_lsn = Some(lsn);
|
||||
}
|
||||
|
||||
writer.finish_write(Lsn(last_lsn.unwrap()));
|
||||
}
|
||||
timeline.freeze_and_flush().await.unwrap();
|
||||
|
||||
let new_layer = timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.likely_resident_layers()
|
||||
.find(|x| x != &initdb_layer)
|
||||
.unwrap();
|
||||
|
||||
// create a copy for the timeline, so we don't overwrite the file
|
||||
let branch = tenant
|
||||
.branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
|
||||
|
||||
// truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
|
||||
// a single key
|
||||
|
||||
for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
|
||||
let truncate_at = Lsn(truncate_at);
|
||||
|
||||
let mut writer = DeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
branch.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
Key::MIN,
|
||||
Lsn(0x11)..truncate_at,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let new_layer = new_layer.download_and_keep_resident().await.unwrap();
|
||||
|
||||
new_layer
|
||||
.copy_delta_prefix(&mut writer, truncate_at, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let copied_layer = writer.finish(Key::MAX, &branch).await.unwrap();
|
||||
|
||||
copied_layer.as_delta(ctx).await.unwrap();
|
||||
|
||||
assert_keys_and_values_eq(
|
||||
new_layer.as_delta(ctx).await.unwrap(),
|
||||
copied_layer.as_delta(ctx).await.unwrap(),
|
||||
truncate_at,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn assert_keys_and_values_eq(
|
||||
source: &DeltaLayerInner,
|
||||
truncated: &DeltaLayerInner,
|
||||
truncated_at: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
use futures::future::ready;
|
||||
use futures::stream::TryStreamExt;
|
||||
|
||||
let start_key = [0u8; DELTA_KEY_SIZE];
|
||||
|
||||
let source_reader = FileBlockReader::new(&source.file, source.file_id);
|
||||
let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
source.index_start_blk,
|
||||
source.index_root_blk,
|
||||
&source_reader,
|
||||
);
|
||||
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
|
||||
let source_stream = source_stream.filter(|res| match res {
|
||||
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
|
||||
_ => ready(true),
|
||||
});
|
||||
let mut source_stream = std::pin::pin!(source_stream);
|
||||
|
||||
let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
|
||||
let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
truncated.index_start_blk,
|
||||
truncated.index_root_blk,
|
||||
&truncated_reader,
|
||||
);
|
||||
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
|
||||
let mut truncated_stream = std::pin::pin!(truncated_stream);
|
||||
|
||||
let mut scratch_left = Vec::new();
|
||||
let mut scratch_right = Vec::new();
|
||||
|
||||
loop {
|
||||
let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
|
||||
let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
|
||||
|
||||
if src.is_none() {
|
||||
assert!(truncated.is_none());
|
||||
break;
|
||||
}
|
||||
|
||||
let (src, truncated) = (src.unwrap(), truncated.unwrap());
|
||||
|
||||
// because we've filtered the source with Lsn, we should always have the same keys from both.
|
||||
assert_eq!(src.0, truncated.0);
|
||||
assert_eq!(src.1, truncated.1);
|
||||
|
||||
// if this is needed for something else, just drop this assert.
|
||||
assert!(
|
||||
src.2.pos() >= truncated.2.pos(),
|
||||
"value position should not go backwards {} vs. {}",
|
||||
src.2.pos(),
|
||||
truncated.2.pos()
|
||||
);
|
||||
|
||||
scratch_left.clear();
|
||||
let src_cursor = source_reader.block_cursor();
|
||||
let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
|
||||
scratch_right.clear();
|
||||
let trunc_cursor = truncated_reader.block_cursor();
|
||||
let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
|
||||
|
||||
tokio::try_join!(left, right).unwrap();
|
||||
|
||||
assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,6 +116,12 @@ impl AsLayerDesc for Layer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Layer {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
Arc::as_ptr(&self.0) == Arc::as_ptr(&other.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
/// Creates a layer value for a file we know to not be resident.
|
||||
pub(crate) fn for_evicted(
|
||||
@@ -1752,6 +1758,28 @@ impl ResidentLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// FIXME: truncate is bad name because we are not truncating anything, but copying the
|
||||
/// filtered parts.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn copy_delta_prefix(
|
||||
&self,
|
||||
writer: &mut super::delta_layer::DeltaLayerWriter,
|
||||
truncate_at: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use LayerKind::*;
|
||||
|
||||
let owner = &self.owner.0;
|
||||
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
Delta(ref d) => d
|
||||
.copy_prefix(writer, truncate_at, ctx)
|
||||
.await
|
||||
.with_context(|| format!("truncate {self}")),
|
||||
Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> &Utf8Path {
|
||||
&self.owner.0.path
|
||||
}
|
||||
@@ -1761,14 +1789,14 @@ impl ResidentLayer {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn get_inner_delta<'a>(
|
||||
&'a self,
|
||||
pub(crate) async fn as_delta(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
|
||||
let owner = &self.owner.0;
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
LayerKind::Delta(d) => Ok(d),
|
||||
LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
|
||||
) -> anyhow::Result<&delta_layer::DeltaLayerInner> {
|
||||
use LayerKind::*;
|
||||
match self.downloaded.get(&self.owner.0, ctx).await? {
|
||||
Delta(ref d) => Ok(d),
|
||||
Image(_) => Err(anyhow::anyhow!("image layer")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,18 +61,18 @@ pub struct VectoredRead {
|
||||
}
|
||||
|
||||
impl VectoredRead {
|
||||
pub fn size(&self) -> usize {
|
||||
pub(crate) fn size(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
enum VectoredReadExtended {
|
||||
pub(crate) enum VectoredReadExtended {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
struct VectoredReadBuilder {
|
||||
pub(crate) struct VectoredReadBuilder {
|
||||
start: u64,
|
||||
end: u64,
|
||||
blobs_at: VecMap<u64, BlobMeta>,
|
||||
@@ -80,7 +80,17 @@ struct VectoredReadBuilder {
|
||||
}
|
||||
|
||||
impl VectoredReadBuilder {
|
||||
fn new(start_offset: u64, end_offset: u64, meta: BlobMeta, max_read_size: usize) -> Self {
|
||||
/// Start building a new vectored read.
|
||||
///
|
||||
/// Note that by design, this does not check against reading more than `max_read_size` to
|
||||
/// support reading larger blobs than the configuration value. The builder will be single use
|
||||
/// however after that.
|
||||
pub(crate) fn new(
|
||||
start_offset: u64,
|
||||
end_offset: u64,
|
||||
meta: BlobMeta,
|
||||
max_read_size: usize,
|
||||
) -> Self {
|
||||
let mut blobs_at = VecMap::default();
|
||||
blobs_at
|
||||
.append(start_offset, meta)
|
||||
@@ -97,7 +107,8 @@ impl VectoredReadBuilder {
|
||||
/// Attempt to extend the current read with a new blob if the start
|
||||
/// offset matches with the current end of the vectored read
|
||||
/// and the resuting size is below the max read size
|
||||
fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
|
||||
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
|
||||
tracing::trace!(start, end, "trying to extend");
|
||||
let size = (end - start) as usize;
|
||||
if self.end == start && self.size() + size <= self.max_read_size {
|
||||
self.end = end;
|
||||
@@ -111,11 +122,11 @@ impl VectoredReadBuilder {
|
||||
VectoredReadExtended::No
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
pub(crate) fn size(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
|
||||
fn build(self) -> VectoredRead {
|
||||
pub(crate) fn build(self) -> VectoredRead {
|
||||
VectoredRead {
|
||||
start: self.start,
|
||||
end: self.end,
|
||||
|
||||
@@ -55,6 +55,7 @@ impl NeonWalRecord {
|
||||
/// Does replaying this WAL record initialize the page from scratch, or does
|
||||
/// it need to be applied over the previous image of the page?
|
||||
pub fn will_init(&self) -> bool {
|
||||
// If you change this function, you'll also need to change ValueBytes::will_init
|
||||
match self {
|
||||
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
|
||||
|
||||
|
||||
@@ -331,7 +331,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
let proxy_listener = TcpListener::bind(proxy_address).await?;
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
|
||||
let cancel_map = CancelMap::default();
|
||||
|
||||
let redis_publisher = match ®ional_redis_client {
|
||||
@@ -357,7 +356,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellation_handler.clone(),
|
||||
));
|
||||
|
||||
@@ -372,7 +370,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
config,
|
||||
serverless_listener,
|
||||
cancellation_token.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellation_handler.clone(),
|
||||
));
|
||||
}
|
||||
@@ -533,7 +530,11 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
let endpoint = http::Endpoint::new(url, http::new_client());
|
||||
|
||||
let api = console::provider::neon::Api::new(endpoint, caches, locks);
|
||||
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
|
||||
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(endpoint_rps_limit));
|
||||
let api =
|
||||
console::provider::neon::Api::new(endpoint, caches, locks, endpoint_rate_limiter);
|
||||
let api = console::provider::ConsoleBackend::Console(api);
|
||||
auth::BackendType::Console(MaybeOwned::Owned(api), ())
|
||||
}
|
||||
@@ -567,8 +568,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
|
||||
};
|
||||
|
||||
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
|
||||
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
|
||||
let mut redis_rps_limit = args.redis_rps_limit.clone();
|
||||
RateBucketInfo::validate(&mut redis_rps_limit)?;
|
||||
|
||||
@@ -581,7 +580,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
authentication_config,
|
||||
require_client_ip: args.require_client_ip,
|
||||
disable_ip_check_for_http: args.disable_ip_check_for_http,
|
||||
endpoint_rps_limit,
|
||||
redis_rps_limit,
|
||||
handshake_timeout: args.handshake_timeout,
|
||||
region: args.region.clone(),
|
||||
|
||||
18
proxy/src/cache/endpoints.rs
vendored
18
proxy/src/cache/endpoints.rs
vendored
@@ -70,20 +70,14 @@ impl EndpointsCache {
|
||||
if !self.ready.load(Ordering::Acquire) {
|
||||
return true;
|
||||
}
|
||||
// If cache is disabled, just collect the metrics and return.
|
||||
if self.config.disable_cache {
|
||||
let rejected = self.should_reject(endpoint);
|
||||
ctx.set_rejected(rejected);
|
||||
info!(?rejected, "check endpoint is valid, disabled cache");
|
||||
return true;
|
||||
}
|
||||
// If the limiter allows, we don't need to check the cache.
|
||||
if self.limiter.lock().await.check() {
|
||||
return true;
|
||||
}
|
||||
let rejected = self.should_reject(endpoint);
|
||||
info!(?rejected, "check endpoint is valid, enabled cache");
|
||||
ctx.set_rejected(rejected);
|
||||
info!(?rejected, "check endpoint is valid, disabled cache");
|
||||
// If cache is disabled, just collect the metrics and return or
|
||||
// If the limiter allows, we don't need to check the cache.
|
||||
if self.config.disable_cache || self.limiter.lock().await.check() {
|
||||
return true;
|
||||
}
|
||||
!rejected
|
||||
}
|
||||
fn should_reject(&self, endpoint: &EndpointId) -> bool {
|
||||
|
||||
@@ -29,7 +29,6 @@ pub struct ProxyConfig {
|
||||
pub authentication_config: AuthenticationConfig,
|
||||
pub require_client_ip: bool,
|
||||
pub disable_ip_check_for_http: bool,
|
||||
pub endpoint_rps_limit: Vec<RateBucketInfo>,
|
||||
pub redis_rps_limit: Vec<RateBucketInfo>,
|
||||
pub region: String,
|
||||
pub handshake_timeout: Duration,
|
||||
|
||||
@@ -208,6 +208,9 @@ pub mod errors {
|
||||
#[error(transparent)]
|
||||
ApiError(ApiError),
|
||||
|
||||
#[error("Too many connections attempts")]
|
||||
TooManyConnections,
|
||||
|
||||
#[error("Timeout waiting to acquire wake compute lock")]
|
||||
TimeoutError,
|
||||
}
|
||||
@@ -240,6 +243,8 @@ pub mod errors {
|
||||
// However, API might return a meaningful error.
|
||||
ApiError(e) => e.to_string_client(),
|
||||
|
||||
TooManyConnections => self.to_string(),
|
||||
|
||||
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
|
||||
}
|
||||
}
|
||||
@@ -250,6 +255,7 @@ pub mod errors {
|
||||
match self {
|
||||
WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
|
||||
WakeComputeError::ApiError(e) => e.get_error_kind(),
|
||||
WakeComputeError::TooManyConnections => crate::error::ErrorKind::RateLimit,
|
||||
WakeComputeError::TimeoutError => crate::error::ErrorKind::ServiceRateLimit,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::{
|
||||
console::messages::ColdStartInfo,
|
||||
http,
|
||||
metrics::{CacheOutcome, Metrics},
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
scram, Normalize,
|
||||
};
|
||||
use crate::{cache::Cached, context::RequestMonitoring};
|
||||
@@ -25,6 +26,7 @@ pub struct Api {
|
||||
endpoint: http::Endpoint,
|
||||
pub caches: &'static ApiCaches,
|
||||
pub locks: &'static ApiLocks,
|
||||
pub endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
jwt: String,
|
||||
}
|
||||
|
||||
@@ -34,6 +36,7 @@ impl Api {
|
||||
endpoint: http::Endpoint,
|
||||
caches: &'static ApiCaches,
|
||||
locks: &'static ApiLocks,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> Self {
|
||||
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
@@ -43,6 +46,7 @@ impl Api {
|
||||
endpoint,
|
||||
caches,
|
||||
locks,
|
||||
endpoint_rate_limiter,
|
||||
jwt,
|
||||
}
|
||||
}
|
||||
@@ -277,6 +281,14 @@ impl super::Api for Api {
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
// check rate limit
|
||||
if !self
|
||||
.endpoint_rate_limiter
|
||||
.check(user_info.endpoint.normalize().into(), 1)
|
||||
{
|
||||
return Err(WakeComputeError::TooManyConnections);
|
||||
}
|
||||
|
||||
let permit = self.locks.get_wake_compute_permit(&key).await?;
|
||||
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
|
||||
@@ -51,7 +51,7 @@ pub struct RequestMonitoring {
|
||||
sender: Option<mpsc::UnboundedSender<RequestData>>,
|
||||
pub latency_timer: LatencyTimer,
|
||||
// Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
|
||||
rejected: bool,
|
||||
rejected: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -96,7 +96,7 @@ impl RequestMonitoring {
|
||||
error_kind: None,
|
||||
auth_method: None,
|
||||
success: false,
|
||||
rejected: false,
|
||||
rejected: None,
|
||||
cold_start_info: ColdStartInfo::Unknown,
|
||||
|
||||
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
|
||||
@@ -118,7 +118,7 @@ impl RequestMonitoring {
|
||||
}
|
||||
|
||||
pub fn set_rejected(&mut self, rejected: bool) {
|
||||
self.rejected = rejected;
|
||||
self.rejected = Some(rejected);
|
||||
}
|
||||
|
||||
pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
|
||||
@@ -200,27 +200,28 @@ impl Drop for RequestMonitoring {
|
||||
} else {
|
||||
ConnectOutcome::Failed
|
||||
};
|
||||
let rejected = self.rejected;
|
||||
let ep = self
|
||||
.endpoint_id
|
||||
.as_ref()
|
||||
.map(|x| x.as_str())
|
||||
.unwrap_or_default();
|
||||
// This makes sense only if cache is disabled
|
||||
info!(
|
||||
?ep,
|
||||
?outcome,
|
||||
?rejected,
|
||||
"check endpoint is valid with outcome"
|
||||
);
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.invalid_endpoints_total
|
||||
.inc(InvalidEndpointsGroup {
|
||||
protocol: self.protocol,
|
||||
rejected: rejected.into(),
|
||||
outcome,
|
||||
});
|
||||
if let Some(rejected) = self.rejected {
|
||||
let ep = self
|
||||
.endpoint_id
|
||||
.as_ref()
|
||||
.map(|x| x.as_str())
|
||||
.unwrap_or_default();
|
||||
// This makes sense only if cache is disabled
|
||||
info!(
|
||||
?outcome,
|
||||
?rejected,
|
||||
?ep,
|
||||
"check endpoint is valid with outcome"
|
||||
);
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.invalid_endpoints_total
|
||||
.inc(InvalidEndpointsGroup {
|
||||
protocol: self.protocol,
|
||||
rejected: rejected.into(),
|
||||
outcome,
|
||||
});
|
||||
}
|
||||
if let Some(tx) = self.sender.take() {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
}
|
||||
|
||||
@@ -19,9 +19,8 @@ use crate::{
|
||||
metrics::{Metrics, NumClientConnectionsGuard},
|
||||
protocol2::WithClientIp,
|
||||
proxy::handshake::{handshake, HandshakeData},
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
stream::{PqStream, Stream},
|
||||
EndpointCacheKey, Normalize,
|
||||
EndpointCacheKey,
|
||||
};
|
||||
use futures::TryFutureExt;
|
||||
use itertools::Itertools;
|
||||
@@ -61,7 +60,6 @@ pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
@@ -86,7 +84,6 @@ pub async fn task_main(
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
|
||||
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
|
||||
@@ -128,7 +125,6 @@ pub async fn task_main(
|
||||
cancellation_handler,
|
||||
socket,
|
||||
ClientMode::Tcp,
|
||||
endpoint_rate_limiter,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
@@ -242,7 +238,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
stream: S,
|
||||
mode: ClientMode,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
|
||||
info!(
|
||||
@@ -288,15 +283,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
Err(e) => stream.throw_error(e).await?,
|
||||
};
|
||||
|
||||
// check rate limit
|
||||
if let Some(ep) = user_info.get_endpoint() {
|
||||
if !endpoint_rate_limiter.check(ep.normalize(), 1) {
|
||||
return stream
|
||||
.throw_error(auth::AuthError::too_many_connections())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
let user = user_info.get_user().to_owned();
|
||||
let user_info = match user_info
|
||||
.authenticate(
|
||||
|
||||
@@ -90,6 +90,7 @@ fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => {
|
||||
WakeupFailureKind::ApiConsoleOtherError
|
||||
}
|
||||
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::TimeoutError => WakeupFailureKind::TimeoutError,
|
||||
};
|
||||
Metrics::get()
|
||||
|
||||
@@ -15,7 +15,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tracing::info;
|
||||
|
||||
use crate::EndpointId;
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
pub struct GlobalRateLimiter {
|
||||
data: Vec<RateBucket>,
|
||||
@@ -61,12 +61,7 @@ impl GlobalRateLimiter {
|
||||
// Purposefully ignore user name and database name as clients can reconnect
|
||||
// with different names, so we'll end up sending some http requests to
|
||||
// the control plane.
|
||||
//
|
||||
// We also may save quite a lot of CPU (I think) by bailing out right after we
|
||||
// saw SNI, before doing TLS handshake. User-side error messages in that case
|
||||
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
|
||||
// I went with a more expensive way that yields user-friendlier error messages.
|
||||
pub type EndpointRateLimiter = BucketRateLimiter<EndpointId, StdRng, RandomState>;
|
||||
pub type EndpointRateLimiter = BucketRateLimiter<EndpointIdInt, StdRng, RandomState>;
|
||||
|
||||
pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
|
||||
map: DashMap<Key, Vec<RateBucket>, Hasher>,
|
||||
@@ -245,7 +240,7 @@ mod tests {
|
||||
use tokio::time;
|
||||
|
||||
use super::{BucketRateLimiter, EndpointRateLimiter};
|
||||
use crate::{rate_limiter::RateBucketInfo, EndpointId};
|
||||
use crate::{intern::EndpointIdInt, rate_limiter::RateBucketInfo, EndpointId};
|
||||
|
||||
#[test]
|
||||
fn rate_bucket_rpi() {
|
||||
@@ -295,39 +290,40 @@ mod tests {
|
||||
let limiter = EndpointRateLimiter::new(rates);
|
||||
|
||||
let endpoint = EndpointId::from("ep-my-endpoint-1234");
|
||||
let endpoint = EndpointIdInt::from(endpoint);
|
||||
|
||||
time::pause();
|
||||
|
||||
for _ in 0..100 {
|
||||
assert!(limiter.check(endpoint.clone(), 1));
|
||||
assert!(limiter.check(endpoint, 1));
|
||||
}
|
||||
// more connections fail
|
||||
assert!(!limiter.check(endpoint.clone(), 1));
|
||||
assert!(!limiter.check(endpoint, 1));
|
||||
|
||||
// fail even after 500ms as it's in the same bucket
|
||||
time::advance(time::Duration::from_millis(500)).await;
|
||||
assert!(!limiter.check(endpoint.clone(), 1));
|
||||
assert!(!limiter.check(endpoint, 1));
|
||||
|
||||
// after a full 1s, 100 requests are allowed again
|
||||
time::advance(time::Duration::from_millis(500)).await;
|
||||
for _ in 1..6 {
|
||||
for _ in 0..50 {
|
||||
assert!(limiter.check(endpoint.clone(), 2));
|
||||
assert!(limiter.check(endpoint, 2));
|
||||
}
|
||||
time::advance(time::Duration::from_millis(1000)).await;
|
||||
}
|
||||
|
||||
// more connections after 600 will exceed the 20rps@30s limit
|
||||
assert!(!limiter.check(endpoint.clone(), 1));
|
||||
assert!(!limiter.check(endpoint, 1));
|
||||
|
||||
// will still fail before the 30 second limit
|
||||
time::advance(time::Duration::from_millis(30_000 - 6_000 - 1)).await;
|
||||
assert!(!limiter.check(endpoint.clone(), 1));
|
||||
assert!(!limiter.check(endpoint, 1));
|
||||
|
||||
// after the full 30 seconds, 100 requests are allowed again
|
||||
time::advance(time::Duration::from_millis(1)).await;
|
||||
for _ in 0..100 {
|
||||
assert!(limiter.check(endpoint.clone(), 1));
|
||||
assert!(limiter.check(endpoint, 1));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,6 @@ use crate::context::RequestMonitoring;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::protocol2::WithClientIp;
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
use crate::serverless::http_util::{api_error_into_response, json_response};
|
||||
|
||||
@@ -53,7 +52,6 @@ pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
ws_listener: TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
@@ -117,7 +115,6 @@ pub async fn task_main(
|
||||
backend.clone(),
|
||||
connections.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellation_token.clone(),
|
||||
server.clone(),
|
||||
tls_acceptor.clone(),
|
||||
@@ -147,7 +144,6 @@ async fn connection_handler(
|
||||
backend: Arc<PoolingBackend>,
|
||||
connections: TaskTracker,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_token: CancellationToken,
|
||||
server: Builder<TokioExecutor>,
|
||||
tls_acceptor: TlsAcceptor,
|
||||
@@ -231,7 +227,6 @@ async fn connection_handler(
|
||||
cancellation_handler.clone(),
|
||||
session_id,
|
||||
peer_addr,
|
||||
endpoint_rate_limiter.clone(),
|
||||
http_request_token,
|
||||
)
|
||||
.in_current_span()
|
||||
@@ -270,7 +265,6 @@ async fn request_handler(
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
session_id: uuid::Uuid,
|
||||
peer_addr: IpAddr,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
// used to cancel in-flight HTTP requests. not used to cancel websockets
|
||||
http_cancellation_token: CancellationToken,
|
||||
) -> Result<Response<Full<Bytes>>, ApiError> {
|
||||
@@ -298,15 +292,9 @@ async fn request_handler(
|
||||
|
||||
ws_connections.spawn(
|
||||
async move {
|
||||
if let Err(e) = websocket::serve_websocket(
|
||||
config,
|
||||
ctx,
|
||||
websocket,
|
||||
cancellation_handler,
|
||||
host,
|
||||
endpoint_rate_limiter,
|
||||
)
|
||||
.await
|
||||
if let Err(e) =
|
||||
websocket::serve_websocket(config, ctx, websocket, cancellation_handler, host)
|
||||
.await
|
||||
{
|
||||
error!("error in websocket connection: {e:#}");
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ use crate::{
|
||||
error::{io_error, ReportableError},
|
||||
metrics::Metrics,
|
||||
proxy::{handle_client, ClientMode},
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream};
|
||||
@@ -136,7 +135,6 @@ pub async fn serve_websocket(
|
||||
websocket: HyperWebsocket,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
hostname: Option<String>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> anyhow::Result<()> {
|
||||
let websocket = websocket.await?;
|
||||
let conn_gauge = Metrics::get()
|
||||
@@ -150,7 +148,6 @@ pub async fn serve_websocket(
|
||||
cancellation_handler,
|
||||
WebSocketRw::new(websocket),
|
||||
ClientMode::Websockets { hostname },
|
||||
endpoint_rate_limiter,
|
||||
conn_gauge,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
@@ -582,6 +583,91 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
|
||||
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Slow test that runs in realtime, checks that the background scheduling of secondary
|
||||
downloads happens as expected.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
# Create this many tenants, each with two timelines
|
||||
tenant_count = 4
|
||||
tenant_timelines = {}
|
||||
|
||||
# This mirrors a constant in `downloader.rs`
|
||||
freshen_interval_secs = 60
|
||||
|
||||
for _i in range(0, tenant_count):
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_a = TimelineId.generate()
|
||||
timeline_b = TimelineId.generate()
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id,
|
||||
timeline_a,
|
||||
placement_policy='{"Attached":1}',
|
||||
# Run with a low heatmap period so that we can avoid having to do synthetic API calls
|
||||
# to trigger the upload promptly.
|
||||
conf={"heatmap_period": "1s"},
|
||||
)
|
||||
env.neon_cli.create_timeline("main2", tenant_id, timeline_b)
|
||||
|
||||
tenant_timelines[tenant_id] = [timeline_a, timeline_b]
|
||||
|
||||
t_start = time.time()
|
||||
|
||||
# Wait long enough that the background downloads should happen; we expect all the inital layers
|
||||
# of all the initial timelines to show up on the secondary location of each tenant.
|
||||
time.sleep(freshen_interval_secs * 1.5)
|
||||
|
||||
for tenant_id, timelines in tenant_timelines.items():
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
ps_attached = env.get_pageserver(attached_to_id)
|
||||
# We only have two: the other one must be secondary
|
||||
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
|
||||
|
||||
for timeline_id in timelines:
|
||||
log.info(f"Checking for secondary timeline {timeline_id} on node {ps_secondary.id}")
|
||||
# One or more layers should be present for all timelines
|
||||
assert list_layers(ps_secondary, tenant_id, timeline_id)
|
||||
|
||||
# Delete the second timeline: this should be reflected later on the secondary
|
||||
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timelines[1])
|
||||
|
||||
# Wait long enough for the secondary locations to see the deletion
|
||||
time.sleep(freshen_interval_secs * 1.5)
|
||||
|
||||
for tenant_id, timelines in tenant_timelines.items():
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
ps_attached = env.get_pageserver(attached_to_id)
|
||||
# We only have two: the other one must be secondary
|
||||
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
|
||||
|
||||
# This one was not deleted
|
||||
assert list_layers(ps_secondary, tenant_id, timelines[0])
|
||||
|
||||
# This one was deleted
|
||||
assert not list_layers(ps_secondary, tenant_id, timelines[1])
|
||||
|
||||
t_end = time.time()
|
||||
|
||||
# Measure how many heatmap downloads we did in total: this checks that we succeeded with
|
||||
# proper scheduling, and not some bug that just runs downloads in a loop.
|
||||
total_heatmap_downloads = 0
|
||||
for ps in env.pageservers:
|
||||
v = ps.http_client().get_metric_value("pageserver_secondary_download_heatmap_total")
|
||||
assert v is not None
|
||||
total_heatmap_downloads += int(v)
|
||||
|
||||
download_rate = (total_heatmap_downloads / tenant_count) / (t_end - t_start)
|
||||
|
||||
expect_download_rate = 1.0 / freshen_interval_secs
|
||||
log.info(f"Download rate: {download_rate * 60}/min vs expected {expect_download_rate * 60}/min")
|
||||
|
||||
assert download_rate < expect_download_rate * 2
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
|
||||
@pytest.mark.parametrize("via_controller", [True, False])
|
||||
def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controller: bool):
|
||||
|
||||
Reference in New Issue
Block a user