mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 19:10:38 +00:00
Merge branch 'main' into skyzh/aux-file-flag-v2-again
This commit is contained in:
88
Cargo.lock
generated
88
Cargo.lock
generated
@@ -25,9 +25,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.9"
|
||||
version = "0.8.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f"
|
||||
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"const-random",
|
||||
@@ -284,9 +284,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "aws-config"
|
||||
version = "1.1.4"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b30c39ebe61f75d1b3785362b1586b41991873c9ab3e317a9181c246fb71d82"
|
||||
checksum = "baaa0be6ee7d90b775ae6ccb6d2ba182b91219ec2001f92338773a094246af1d"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -309,14 +309,15 @@ dependencies = [
|
||||
"time",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"url",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-credential-types"
|
||||
version = "1.1.8"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa8587ae17c8e967e4b05a62d495be2fb7701bec52a97f7acfe8a29f938384c8"
|
||||
checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-runtime-api",
|
||||
@@ -326,9 +327,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-runtime"
|
||||
version = "1.1.8"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b13dc54b4b49f8288532334bba8f87386a40571c47c37b1304979b556dc613c8"
|
||||
checksum = "785da4a15e7b166b505fd577e4560c7a7cd8fbdf842eb1336cbcbf8944ce56f1"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-sigv4",
|
||||
@@ -373,10 +374,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-s3"
|
||||
version = "1.14.0"
|
||||
version = "1.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "951f7730f51a2155c711c85c79f337fbc02a577fa99d2a0a8059acfce5392113"
|
||||
checksum = "7bc5ce518d4b8d16e0408de7bdf1b3097cec61a7daa979750a208f8d9934386d"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
"aws-sigv4",
|
||||
@@ -391,20 +393,25 @@ dependencies = [
|
||||
"aws-smithy-xml",
|
||||
"aws-types",
|
||||
"bytes",
|
||||
"fastrand 2.0.0",
|
||||
"hex",
|
||||
"hmac",
|
||||
"http 0.2.9",
|
||||
"http-body 0.4.5",
|
||||
"lru",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"regex-lite",
|
||||
"sha2",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sso"
|
||||
version = "1.12.0"
|
||||
version = "1.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f486420a66caad72635bc2ce0ff6581646e0d32df02aa39dc983bfe794955a5b"
|
||||
checksum = "ca3d6c4cba4e009391b72b0fcf12aff04ea3c9c3aa2ecaafa330326a8bd7e601"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -424,9 +431,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-ssooidc"
|
||||
version = "1.12.0"
|
||||
version = "1.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39ddccf01d82fce9b4a15c8ae8608211ee7db8ed13a70b514bbfe41df3d24841"
|
||||
checksum = "73400dc239d14f63d932f4ca7b55af5e9ef1f857f7d70655249ccc287adb2570"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -446,9 +453,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sts"
|
||||
version = "1.12.0"
|
||||
version = "1.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a591f8c7e6a621a501b2b5d2e88e1697fcb6274264523a6ad4d5959889a41ce"
|
||||
checksum = "10f8858308af76fba3e5ffcf1bb56af5471574d2bdfaf0159470c25bc2f760e5"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -469,9 +476,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sigv4"
|
||||
version = "1.2.0"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11d6f29688a4be9895c0ba8bef861ad0c0dac5c15e9618b9b7a6c233990fc263"
|
||||
checksum = "58b56f1cbe6fd4d0c2573df72868f20ab1c125ca9c9dbce17927a463433a2e57"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-smithy-eventstream",
|
||||
@@ -498,9 +505,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-async"
|
||||
version = "1.1.8"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d26ea8fa03025b2face2b3038a63525a10891e3d8829901d502e5384a0d8cd46"
|
||||
checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"pin-project-lite",
|
||||
@@ -509,9 +516,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-checksums"
|
||||
version = "0.60.4"
|
||||
version = "0.60.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be2acd1b9c6ae5859999250ed5a62423aedc5cf69045b844432de15fa2f31f2b"
|
||||
checksum = "83fa43bc04a6b2441968faeab56e68da3812f978a670a5db32accbdcafddd12f"
|
||||
dependencies = [
|
||||
"aws-smithy-http",
|
||||
"aws-smithy-types",
|
||||
@@ -541,9 +548,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-http"
|
||||
version = "0.60.7"
|
||||
version = "0.60.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f10fa66956f01540051b0aa7ad54574640f748f9839e843442d99b970d3aff9"
|
||||
checksum = "4a7de001a1b9a25601016d8057ea16e31a45fdca3751304c8edf4ad72e706c08"
|
||||
dependencies = [
|
||||
"aws-smithy-eventstream",
|
||||
"aws-smithy-runtime-api",
|
||||
@@ -581,9 +588,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-runtime"
|
||||
version = "1.1.8"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec81002d883e5a7fd2bb063d6fb51c4999eb55d404f4fff3dd878bf4733b9f01"
|
||||
checksum = "c9ac79e9f3a4d576f3cd4a470a0275b138d9e7b11b1cd514a6858ae0a79dd5bb"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-http",
|
||||
@@ -594,6 +601,7 @@ dependencies = [
|
||||
"h2 0.3.26",
|
||||
"http 0.2.9",
|
||||
"http-body 0.4.5",
|
||||
"http-body 1.0.0",
|
||||
"hyper 0.14.26",
|
||||
"hyper-rustls 0.24.0",
|
||||
"once_cell",
|
||||
@@ -606,9 +614,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-runtime-api"
|
||||
version = "1.2.0"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9acb931e0adaf5132de878f1398d83f8677f90ba70f01f65ff87f6d7244be1c5"
|
||||
checksum = "04ec42c2f5c0e7796a2848dde4d9f3bf8ce12ccbb3d5aa40c52fa0cdd61a1c47"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-types",
|
||||
@@ -623,16 +631,19 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-types"
|
||||
version = "1.1.8"
|
||||
version = "1.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "abe14dceea1e70101d38fbf2a99e6a34159477c0fb95e68e05c66bd7ae4c3729"
|
||||
checksum = "baf98d97bba6ddaba180f1b1147e202d8fe04940403a95a3f826c790f931bbd1"
|
||||
dependencies = [
|
||||
"base64-simd",
|
||||
"bytes",
|
||||
"bytes-utils",
|
||||
"futures-core",
|
||||
"http 0.2.9",
|
||||
"http 1.1.0",
|
||||
"http-body 0.4.5",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"itoa",
|
||||
"num-integer",
|
||||
"pin-project-lite",
|
||||
@@ -646,18 +657,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-xml"
|
||||
version = "0.60.7"
|
||||
version = "0.60.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "872c68cf019c0e4afc5de7753c4f7288ce4b71663212771bf5e4542eb9346ca9"
|
||||
checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55"
|
||||
dependencies = [
|
||||
"xmlparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-types"
|
||||
version = "1.1.8"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0dbf2f3da841a8930f159163175cf6a3d16ddde517c1b0fba7aa776822800f40"
|
||||
checksum = "5a43b56df2c529fe44cb4d92bd64d0479883fb9608ff62daede4df5405381814"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-smithy-async",
|
||||
@@ -2935,6 +2946,15 @@ version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
|
||||
14
Cargo.toml
14
Cargo.toml
@@ -52,14 +52,14 @@ azure_storage_blobs = "0.19"
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
aws-config = { version = "1.1.4", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "1.14"
|
||||
aws-config = { version = "1.3", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "1.26"
|
||||
aws-sdk-iam = "1.15.0"
|
||||
aws-smithy-async = { version = "1.1.4", default-features = false, features=["rt-tokio"] }
|
||||
aws-smithy-types = "1.1.4"
|
||||
aws-credential-types = "1.1.4"
|
||||
aws-sigv4 = { version = "1.2.0", features = ["sign-http"] }
|
||||
aws-types = "1.1.7"
|
||||
aws-smithy-async = { version = "1.2.1", default-features = false, features=["rt-tokio"] }
|
||||
aws-smithy-types = "1.1.9"
|
||||
aws-credential-types = "1.2.0"
|
||||
aws-sigv4 = { version = "1.2.1", features = ["sign-http"] }
|
||||
aws-types = "1.2.0"
|
||||
axum = { version = "0.6.20", features = ["ws"] }
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
|
||||
@@ -27,7 +27,7 @@ use aws_config::{
|
||||
};
|
||||
use aws_credential_types::provider::SharedCredentialsProvider;
|
||||
use aws_sdk_s3::{
|
||||
config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
|
||||
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
|
||||
error::SdkError,
|
||||
operation::get_object::GetObjectError,
|
||||
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
|
||||
@@ -75,13 +75,13 @@ struct GetObjectRequest {
|
||||
}
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
pub fn new(aws_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
|
||||
pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
|
||||
tracing::debug!(
|
||||
"Creating s3 remote storage for S3 bucket {}",
|
||||
aws_config.bucket_name
|
||||
remote_storage_config.bucket_name
|
||||
);
|
||||
|
||||
let region = Some(Region::new(aws_config.bucket_region.clone()));
|
||||
let region = Some(Region::new(remote_storage_config.bucket_region.clone()));
|
||||
|
||||
let provider_conf = ProviderConfig::without_region().with_region(region.clone());
|
||||
|
||||
@@ -113,6 +113,38 @@ impl S3Bucket {
|
||||
// AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
|
||||
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
|
||||
|
||||
let sdk_config_loader: aws_config::ConfigLoader = aws_config::defaults(
|
||||
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
|
||||
BehaviorVersion::v2023_11_09(),
|
||||
)
|
||||
.region(region)
|
||||
.identity_cache(IdentityCache::lazy().build())
|
||||
.credentials_provider(SharedCredentialsProvider::new(credentials_provider))
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl));
|
||||
|
||||
let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
|
||||
s.spawn(|| {
|
||||
// TODO: make this function async.
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(sdk_config_loader.load())
|
||||
})
|
||||
.join()
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
|
||||
|
||||
// Technically, the `remote_storage_config.endpoint` field only applies to S3 interactions.
|
||||
// (In case we ever re-use the `sdk_config` for more than just the S3 client in the future)
|
||||
if let Some(custom_endpoint) = remote_storage_config.endpoint.clone() {
|
||||
s3_config_builder = s3_config_builder
|
||||
.endpoint_url(custom_endpoint)
|
||||
.force_path_style(true);
|
||||
}
|
||||
|
||||
// We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
|
||||
// responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
|
||||
// attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
|
||||
@@ -120,42 +152,36 @@ impl S3Bucket {
|
||||
retry_config
|
||||
.set_max_attempts(Some(1))
|
||||
.set_mode(Some(RetryMode::Adaptive));
|
||||
s3_config_builder = s3_config_builder.retry_config(retry_config.build());
|
||||
|
||||
let mut config_builder = Builder::default()
|
||||
.behavior_version(BehaviorVersion::v2023_11_09())
|
||||
.region(region)
|
||||
.identity_cache(IdentityCache::lazy().build())
|
||||
.credentials_provider(SharedCredentialsProvider::new(credentials_provider))
|
||||
.retry_config(retry_config.build())
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl));
|
||||
let s3_config = s3_config_builder.build();
|
||||
let client = aws_sdk_s3::Client::from_conf(s3_config);
|
||||
|
||||
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
|
||||
config_builder = config_builder
|
||||
.endpoint_url(custom_endpoint)
|
||||
.force_path_style(true);
|
||||
}
|
||||
let prefix_in_bucket = remote_storage_config
|
||||
.prefix_in_bucket
|
||||
.as_deref()
|
||||
.map(|prefix| {
|
||||
let mut prefix = prefix;
|
||||
while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
prefix = &prefix[1..]
|
||||
}
|
||||
|
||||
let client = Client::from_conf(config_builder.build());
|
||||
let mut prefix = prefix.to_string();
|
||||
while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
prefix.pop();
|
||||
}
|
||||
prefix
|
||||
});
|
||||
|
||||
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
|
||||
let mut prefix = prefix;
|
||||
while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
prefix = &prefix[1..]
|
||||
}
|
||||
|
||||
let mut prefix = prefix.to_string();
|
||||
while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
prefix.pop();
|
||||
}
|
||||
prefix
|
||||
});
|
||||
Ok(Self {
|
||||
client,
|
||||
bucket_name: aws_config.bucket_name.clone(),
|
||||
max_keys_per_list_response: aws_config.max_keys_per_list_response,
|
||||
bucket_name: remote_storage_config.bucket_name.clone(),
|
||||
max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
|
||||
prefix_in_bucket,
|
||||
concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
|
||||
upload_storage_class: aws_config.upload_storage_class.clone(),
|
||||
concurrency_limiter: ConcurrencyLimiter::new(
|
||||
remote_storage_config.concurrency_limit.get(),
|
||||
),
|
||||
upload_storage_class: remote_storage_config.upload_storage_class.clone(),
|
||||
timeout,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -24,7 +24,9 @@ use tracing::{debug, info};
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::ops::Range;
|
||||
|
||||
use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with};
|
||||
use crate::helpers::{
|
||||
accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with,
|
||||
};
|
||||
use crate::interface::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -535,7 +537,10 @@ where
|
||||
}
|
||||
}
|
||||
// Open stream
|
||||
let key_value_stream = std::pin::pin!(merge_delta_keys::<E>(deltas.as_slice(), ctx));
|
||||
let key_value_stream =
|
||||
std::pin::pin!(merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
|
||||
.await?
|
||||
.map(Result::<_, anyhow::Error>::Ok));
|
||||
let mut new_jobs = Vec::new();
|
||||
|
||||
// Slide a window through the keyspace
|
||||
|
||||
@@ -14,6 +14,7 @@ use std::future::Future;
|
||||
use std::ops::{DerefMut, Range};
|
||||
use std::pin::Pin;
|
||||
use std::task::{ready, Poll};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub fn keyspace_total_size<K>(
|
||||
keyspace: &CompactionKeySpace<K>,
|
||||
@@ -109,17 +110,40 @@ pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
|
||||
layers: &'a [E::DeltaLayer],
|
||||
ctx: &'a E::RequestContext,
|
||||
) -> anyhow::Result<impl Stream<Item = <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>
|
||||
{
|
||||
let mut keys = Vec::new();
|
||||
for l in layers {
|
||||
// Boxing and casting to LoadFuture is required to obtain the right Sync bound.
|
||||
// If we do l.load_keys(ctx).await? directly, there is a compilation error.
|
||||
let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx));
|
||||
keys.extend(load_future.await?.into_iter());
|
||||
}
|
||||
keys.sort_by_key(|k| (k.key(), k.lsn()));
|
||||
let stream = futures::stream::iter(keys.into_iter());
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
|
||||
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
|
||||
Unloaded(&'a E::DeltaLayer),
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
|
||||
fn key(&self) -> E::Key {
|
||||
fn min_key(&self) -> E::Key {
|
||||
match self {
|
||||
Self::Loaded(entries) => entries.front().unwrap().key(),
|
||||
Self::Unloaded(dl) => dl.key_range().start,
|
||||
}
|
||||
}
|
||||
fn min_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Self::Loaded(entries) => entries.front().unwrap().lsn(),
|
||||
Self::Unloaded(dl) => dl.lsn_range().start,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
@@ -129,12 +153,12 @@ impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
|
||||
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// reverse order so that we get a min-heap
|
||||
other.key().cmp(&self.key())
|
||||
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
|
||||
}
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.key().eq(&other.key())
|
||||
self.cmp(other) == std::cmp::Ordering::Equal
|
||||
}
|
||||
}
|
||||
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
|
||||
|
||||
@@ -1220,11 +1220,17 @@ impl Timeline {
|
||||
}
|
||||
reconstruct_timer.stop_and_record();
|
||||
|
||||
// Note that this is an approximation. Tracking the exact number of layers visited
|
||||
// per key requires virtually unbounded memory usage and is inefficient
|
||||
// (i.e. segment tree tracking each range queried from a layer)
|
||||
crate::metrics::VEC_READ_NUM_LAYERS_VISITED
|
||||
.observe(layers_visited as f64 / results.len() as f64);
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
// when they're missing. Instead they are omitted from the resulting btree
|
||||
// (this is a requirement, not a bug). Skip updating the metric in these cases
|
||||
// to avoid infinite results.
|
||||
if !results.is_empty() {
|
||||
// Note that this is an approximation. Tracking the exact number of layers visited
|
||||
// per key requires virtually unbounded memory usage and is inefficient
|
||||
// (i.e. segment tree tracking each range queried from a layer)
|
||||
crate::metrics::VEC_READ_NUM_LAYERS_VISITED
|
||||
.observe(layers_visited as f64 / results.len() as f64);
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
@@ -3,5 +3,4 @@ comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.1'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = false
|
||||
superuser = true
|
||||
trusted = true
|
||||
|
||||
@@ -312,7 +312,10 @@ pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Clie
|
||||
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
|
||||
|
||||
let mut builder = Config::builder()
|
||||
.behavior_version(BehaviorVersion::v2023_11_09())
|
||||
.behavior_version(
|
||||
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
|
||||
BehaviorVersion::v2023_11_09(),
|
||||
)
|
||||
.region(bucket_region)
|
||||
.retry_config(RetryConfig::adaptive().with_max_attempts(3))
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
|
||||
|
||||
@@ -4745,7 +4745,7 @@ impl Service {
|
||||
// them in an optimization
|
||||
const DOWNLOAD_FRESHNESS_THRESHOLD: u64 = 10 * 1024 * 1024 * 1024;
|
||||
|
||||
if progress.bytes_total == 0
|
||||
if progress.heatmap_mtime.is_none()
|
||||
|| progress.bytes_total < DOWNLOAD_FRESHNESS_THRESHOLD
|
||||
&& progress.bytes_downloaded != progress.bytes_total
|
||||
|| progress.bytes_total - progress.bytes_downloaded
|
||||
|
||||
@@ -92,6 +92,166 @@ Exit after the first test failure:
|
||||
`./scripts/pytest -x ...`
|
||||
(there are many more pytest options; run `pytest -h` to see them.)
|
||||
|
||||
#### Running Python tests against real S3 or S3-compatible services
|
||||
|
||||
Neon's `libs/remote_storage` supports multiple implementations of remote storage.
|
||||
At the time of writing, that is
|
||||
```rust
|
||||
pub enum RemoteStorageKind {
|
||||
/// Storage based on local file system.
|
||||
/// Specify a root folder to place all stored files into.
|
||||
LocalFs(Utf8PathBuf),
|
||||
/// AWS S3 based storage, storing all files in the S3 bucket
|
||||
/// specified by the config
|
||||
AwsS3(S3Config),
|
||||
/// Azure Blob based storage, storing all files in the container
|
||||
/// specified by the config
|
||||
AzureContainer(AzureConfig),
|
||||
}
|
||||
```
|
||||
|
||||
The test suite has a Python enum with equal name but different meaning:
|
||||
|
||||
```python
|
||||
@enum.unique
|
||||
class RemoteStorageKind(str, enum.Enum):
|
||||
LOCAL_FS = "local_fs"
|
||||
MOCK_S3 = "mock_s3"
|
||||
REAL_S3 = "real_s3"
|
||||
```
|
||||
|
||||
* `LOCAL_FS` => `LocalFs`
|
||||
* `MOCK_S3`: starts [`moto`](https://github.com/getmoto/moto)'s S3 implementation, then configures Pageserver with `AwsS3`
|
||||
* `REAL_S3` => configure `AwsS3` as detailed below
|
||||
|
||||
When a test in the test suite needs an `AwsS3`, it is supposed to call `remote_storage.s3_storage()`.
|
||||
That function checks env var `ENABLE_REAL_S3_REMOTE_STORAGE`:
|
||||
* If it is not set, use `MOCK_S3`
|
||||
* If it is set, use `REAL_S3`.
|
||||
|
||||
For `REAL_S3`, the test suite creates the dict/toml representation of the `RemoteStorageKind::AwsS3` based on env vars:
|
||||
|
||||
```rust
|
||||
pub struct S3Config {
|
||||
// test suite env var: REMOTE_STORAGE_S3_BUCKET
|
||||
pub bucket_name: String,
|
||||
// test suite env var: REMOTE_STORAGE_S3_REGION
|
||||
pub bucket_region: String,
|
||||
// test suite determines this
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
// no env var exists; test suite sets it for MOCK_S3, because that's how moto works
|
||||
pub endpoint: Option<String>,
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
*Credentials* are not part of the config, but discovered by the AWS SDK.
|
||||
See the `libs/remote_storage` Rust code.
|
||||
We're documenting two mechanism here:
|
||||
|
||||
The test suite supports two mechanisms (`remote_storage.py`):
|
||||
|
||||
**Credential mechanism 1**: env vars `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`.
|
||||
Populate the env vars with AWS access keys that you created in IAM.
|
||||
Our CI uses this mechanism.
|
||||
However, it is _not_ recommended for interactive use by developers ([learn more](https://docs.aws.amazon.com/sdkref/latest/guide/access-users.html#credentials-long-term)).
|
||||
Instead, use profiles (next section).
|
||||
|
||||
**Credential mechanism 2**: env var `AWS_PROFILE`.
|
||||
This uses the AWS SDK's (and CLI's) profile mechanism.
|
||||
Learn more about it [in the official docs](https://docs.aws.amazon.com/sdkref/latest/guide/file-format.html).
|
||||
After configuring a profile (e.g. via the aws CLI), set the env var to its name.
|
||||
|
||||
In conclusion, the full command line is:
|
||||
|
||||
```bash
|
||||
# with long-term AWS access keys
|
||||
ENABLE_REAL_S3_REMOTE_STORAGE=true \
|
||||
REMOTE_STORAGE_S3_BUCKET=mybucket \
|
||||
REMOTE_STORAGE_S3_REGION=eu-central-1 \
|
||||
AWS_ACCESS_KEY_ID=... \
|
||||
AWS_SECRET_ACCESS_KEY=... \
|
||||
./scripts/pytest
|
||||
```
|
||||
<!-- Don't forget to update the Minio example when changing these -->
|
||||
```bash
|
||||
# with AWS PROFILE
|
||||
ENABLE_REAL_S3_REMOTE_STORAGE=true \
|
||||
REMOTE_STORAGE_S3_BUCKET=mybucket \
|
||||
REMOTE_STORAGE_S3_REGION=eu-central-1 \
|
||||
AWS_PROFILE=... \
|
||||
./scripts/pytest
|
||||
```
|
||||
|
||||
If you're using SSO, make sure to `aws sso login --profile $AWS_PROFILE` first.
|
||||
|
||||
##### Minio
|
||||
|
||||
If you want to run test without the cloud setup, we recommend [minio](https://min.io/docs/minio/linux/index.html).
|
||||
|
||||
```bash
|
||||
# Start in Terminal 1
|
||||
mkdir /tmp/minio_data
|
||||
minio server /tmp/minio_data --console-address 127.0.0.1:9001 --address 127.0.0.1:9000
|
||||
```
|
||||
|
||||
In another terminal, create an `aws` CLI profile for it:
|
||||
|
||||
```ini
|
||||
# append to ~/.aws/config
|
||||
[profile local-minio]
|
||||
services = local-minio-services
|
||||
[services local-minio-services]
|
||||
s3 =
|
||||
endpoint_url=http://127.0.0.1:9000/
|
||||
```
|
||||
|
||||
|
||||
Now configure the credentials (this is going to write `~/.aws/credentials` for you).
|
||||
It's an interactive prompt.
|
||||
|
||||
```bash
|
||||
# Terminal 2
|
||||
$ aws --profile local-minio configure
|
||||
AWS Access Key ID [None]: minioadmin
|
||||
AWS Secret Access Key [None]: minioadmin
|
||||
Default region name [None]:
|
||||
Default output format [None]:
|
||||
```
|
||||
|
||||
Now create a bucket `testbucket` using the CLI.
|
||||
|
||||
```bash
|
||||
# (don't forget to have AWS_PROFILE env var set; or use --profile)
|
||||
aws --profile local-minio s3 mb s3://mybucket
|
||||
```
|
||||
|
||||
(If it doesn't work, make sure you update your AWS CLI to a recent version.
|
||||
The [service-specific endpoint feature](https://docs.aws.amazon.com/sdkref/latest/guide/feature-ss-endpoints.html)
|
||||
that we're using is quite new.)
|
||||
|
||||
```bash
|
||||
# with AWS PROFILE
|
||||
ENABLE_REAL_S3_REMOTE_STORAGE=true \
|
||||
REMOTE_STORAGE_S3_BUCKET=mybucket \
|
||||
REMOTE_STORAGE_S3_REGION=doesntmatterforminio \
|
||||
AWS_PROFILE=local-minio \
|
||||
./scripts/pytest
|
||||
```
|
||||
|
||||
NB: you can avoid the `--profile` by setting the `AWS_PROFILE` variable.
|
||||
Just like the AWS SDKs, the `aws` CLI is sensible to it.
|
||||
|
||||
#### Running Rust tests against real S3 or S3-compatible services
|
||||
|
||||
We have some Rust tests that only run against real S3, e.g., [here](https://github.com/neondatabase/neon/blob/c18d3340b5e3c978a81c3db8b6f1e83cd9087e8a/libs/remote_storage/tests/test_real_s3.rs#L392-L397).
|
||||
|
||||
They use the same env vars as the Python test suite (see previous section)
|
||||
but interpret them on their own.
|
||||
However, at this time, the interpretation is identical.
|
||||
|
||||
So, above instructions apply to the Rust test as well.
|
||||
|
||||
### Writing a test
|
||||
|
||||
Every test needs a Neon Environment, or NeonEnv to operate in. A Neon Environment
|
||||
|
||||
@@ -701,6 +701,11 @@ class NeonEnvBuilder:
|
||||
config["default_tenant_id"] = snapshot_config["default_tenant_id"]
|
||||
config["branch_name_mappings"] = snapshot_config["branch_name_mappings"]
|
||||
|
||||
# Update the config with new neon + postgres path in case of compat test
|
||||
# FIXME: overriding pg_distrib_dir cause storage controller fail to start
|
||||
# config["pg_distrib_dir"] = str(self.pg_distrib_dir)
|
||||
config["neon_distrib_dir"] = str(self.neon_binpath)
|
||||
|
||||
with (self.repo_dir / "config").open("w") as f:
|
||||
toml.dump(config, f)
|
||||
|
||||
|
||||
@@ -88,7 +88,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*Flushed oversized open layer with size.*",
|
||||
# During teardown, we stop the storage controller before the pageservers, so pageservers
|
||||
# can experience connection errors doing background deletion queue work.
|
||||
".*WARN deletion backend: calling control plane generation validation API failed.*Connection refused.*",
|
||||
".*WARN deletion backend: calling control plane generation validation API failed.*error sending request.*",
|
||||
# Can happen when the test shuts down the storage controller while it is calling the utilization API
|
||||
".*WARN.*path=/v1/utilization .*request was dropped before completing",
|
||||
)
|
||||
|
||||
@@ -102,6 +102,9 @@ def test_storage_controller_many_tenants(
|
||||
tenant_id,
|
||||
shard_count,
|
||||
stripe_size,
|
||||
# Upload heatmaps fast, so that secondary downloads happen promptly, enabling
|
||||
# the controller's optimization migrations to proceed promptly.
|
||||
tenant_config={"heatmap_period": "10s"},
|
||||
placement_policy={"Attached": 1},
|
||||
)
|
||||
futs.append(f)
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Npgsql" Version="8.0.2" />
|
||||
<PackageReference Include="Npgsql" Version="8.0.3" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -168,15 +168,16 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
|
||||
# The VM page in shared buffer cache, and the same page as reconstructed
|
||||
# by the pageserver, should be equal.
|
||||
#
|
||||
# Ignore the LSN on the page though (first 8 bytes). If the dirty
|
||||
# VM page is flushed from the cache for some reason, it gets WAL-logged,
|
||||
# which changes the LSN on the page.
|
||||
# Ignore page header (24 bytes) of visibility map.
|
||||
# If the dirty VM page is flushed from the cache for some reason,
|
||||
# it gets WAL-logged, which changes the LSN on the page.
|
||||
# Also in neon SMGR we can replace empty heap page with zero (uninitialized) heap page.
|
||||
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
|
||||
vm_page_in_cache = (cur.fetchall()[0][0])[8:100].hex()
|
||||
vm_page_in_cache = (cur.fetchall()[0][0])[24:100].hex()
|
||||
cur.execute(
|
||||
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn(), NULL )"
|
||||
)
|
||||
vm_page_at_pageserver = (cur.fetchall()[0][0])[8:100].hex()
|
||||
vm_page_at_pageserver = (cur.fetchall()[0][0])[24:100].hex()
|
||||
|
||||
assert vm_page_at_pageserver == vm_page_in_cache
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ aws-runtime = { version = "1", default-features = false, features = ["event-stre
|
||||
aws-sigv4 = { version = "1", features = ["http0-compat", "sign-eventstream", "sigv4a"] }
|
||||
aws-smithy-async = { version = "1", default-features = false, features = ["rt-tokio"] }
|
||||
aws-smithy-http = { version = "0.60", default-features = false, features = ["event-stream"] }
|
||||
aws-smithy-types = { version = "1", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "rt-tokio", "test-util"] }
|
||||
aws-smithy-types = { version = "1", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "http-body-1-x", "rt-tokio", "test-util"] }
|
||||
axum = { version = "0.6", features = ["ws"] }
|
||||
base64 = { version = "0.21", features = ["alloc"] }
|
||||
base64ct = { version = "1", default-features = false, features = ["std"] }
|
||||
|
||||
Reference in New Issue
Block a user