Compare commits

..

5 Commits

Author SHA1 Message Date
Folke Behrens
0896dcf3d4 proxy: Add temporary stream_events tool
neondatabase/cloud#19600
2024-11-14 11:49:07 +01:00
Arseny Sher
d06bf4b0fe safekeeper: fix atomicity of WAL truncation (#9685)
If WAL truncation fails in the middle it might leave some data on disk
above the write/flush LSN. In theory, concatenated with previous records
it might form bogus WAL (though very unlikely in practice because CRC
would protect from that). To protect from that, set
pending_wal_truncation flag: means before any WAL writes truncation must
be retried until it succeeds. We already did that in case of safekeeper
restart, now extend this mechanism for failures without restart. Also,
importantly, reset LSNs in the beginning of the operation, not in the
end, because once on disk deletion starts previous pointers are wrong.

All this most likely haven't created any problems in practice because
CRC protects from the consequences.

Tests for this are hard; simulation infrastructure might be useful here
in the future, but not yet.
2024-11-14 13:06:42 +03:00
Tristan Partin
1280b708f1 Improve error handling for NeonAPI fixture
Move error handling to the common request function and add a debug log.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-11-13 20:35:48 -06:00
John Spray
b4e00b8b22 pageserver: refuse to load tenants with suspiciously old indices in old generations (#9719)
## Problem

Historically, if a control component passed a pageserver "generation: 1"
this could be a quick way to corrupt a tenant by loading a historic
index.

Follows https://github.com/neondatabase/neon/pull/9383
Closes #6951 

## Summary of changes

- Introduce a Fatal variant to DownloadError, to enable index downloads
to signal when they have encountered a scary enough situation that we
shouldn't proceed to load the tenant.
- Handle this variant by putting the tenant into a broken state (no
matter which timeline within the tenant reported it)
- Add a test for this case

In the event that this behavior fires when we don't want it to, we have
ways to intervene:
- "Touch" an affected index to update its mtime (download+upload S3
object)
- If this behavior is triggered, it indicates we're attaching in some
old generation, so we should be able to fix that by manually bumping
generation numbers in the storage controller database (this should never
happen, but it's an option if it does)
2024-11-13 18:07:39 +00:00
Heikki Linnakangas
10aaa3677d PostgreSQL minor version updates (17.1, 16.5, 15.9, 14.14) (#9727)
This includes a patch to temporarily disable one test in the pg_anon
test suite. It is an upstream issue, the test started failing with the
new PostgreSQL minor versions because of a change in the default
timezone used in tests. We don't want to block the release for this,
so just disable the test for now. See
199f0a392b (note_2148017485)

Corresponding postgres repository PRs:
https://github.com/neondatabase/postgres/pull/524
https://github.com/neondatabase/postgres/pull/525
https://github.com/neondatabase/postgres/pull/526
https://github.com/neondatabase/postgres/pull/527
2024-11-13 15:08:58 +02:00
20 changed files with 667 additions and 232 deletions

4
Cargo.lock generated
View File

@@ -994,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.8.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
dependencies = [
"serde",
]

View File

@@ -55,6 +55,7 @@ RUN set -e \
--bin proxy \
--bin neon_local \
--bin storage_scrubber \
--bin stream_events \
--locked --release
# Build final image
@@ -82,6 +83,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_control
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/stream_events /usr/local/bin
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/

View File

@@ -1,3 +1,45 @@
commit 00aa659afc9c7336ab81036edec3017168aabf40
Author: Heikki Linnakangas <heikki@neon.tech>
Date: Tue Nov 12 16:59:19 2024 +0200
Temporarily disable test that depends on timezone
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
index 23ef5fa..9e60deb 100644
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
(1 row)
-SELECT anon.generalize_tstzrange('19041107','millennium');
- generalize_tstzrange
------------------------------------------------------------------
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
-(1 row)
-
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
generalize_daterange
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
index b868344..b4fc977 100644
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
SELECT anon.generalize_tstzrange('19041107','year');
SELECT anon.generalize_tstzrange('19041107','decade');
SELECT anon.generalize_tstzrange('19041107','century');
-SELECT anon.generalize_tstzrange('19041107','millennium');
+-- temporarily disabled, see:
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
+--SELECT anon.generalize_tstzrange('19041107','millennium');
-- generalize_daterange
SELECT anon.generalize_daterange('19041107');
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
Author: Alexey Masterov <alexeymasterov@neon.tech>
Date: Fri May 31 06:34:26 2024 +0000

View File

@@ -15,6 +15,9 @@ pub enum DownloadError {
///
/// Concurrency control is not timed within timeout.
Timeout,
/// Some integrity/consistency check failed during download. This is used during
/// timeline loads to cancel the load of a tenant if some timeline detects fatal corruption.
Fatal(String),
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
@@ -29,6 +32,7 @@ impl std::fmt::Display for DownloadError {
DownloadError::Unmodified => write!(f, "File was not modified"),
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
DownloadError::Timeout => write!(f, "timeout"),
DownloadError::Fatal(why) => write!(f, "Fatal read error: {why}"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
}
}
@@ -41,7 +45,7 @@ impl DownloadError {
pub fn is_permanent(&self) -> bool {
use DownloadError::*;
match self {
BadInput(_) | NotFound | Unmodified | Cancelled => true,
BadInput(_) | NotFound | Unmodified | Fatal(_) | Cancelled => true,
Timeout | Other(_) => false,
}
}

View File

@@ -1433,6 +1433,12 @@ impl Tenant {
info!(%timeline_id, "index_part not found on remote");
continue;
}
Err(DownloadError::Fatal(why)) => {
// If, while loading one remote timeline, we saw an indication that our generation
// number is likely invalid, then we should not load the whole tenant.
error!(%timeline_id, "Fatal error loading timeline: {why}");
anyhow::bail!(why.to_string());
}
Err(e) => {
// Some (possibly ephemeral) error happened during index_part download.
// Pretend the timeline exists to not delete the timeline directory,

View File

@@ -574,12 +574,18 @@ impl RemoteTimelineClient {
if latest_index_generation > index_generation {
// Unexpected! Why are we loading such an old index if a more recent one exists?
tracing::warn!(
// We will refuse to proceed, as there is no reasonable scenario where this should happen, but
// there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
// backwards).
tracing::error!(
?index_generation,
?latest_index_generation,
?latest_index_mtime,
"Found a newer index while loading an old one"
);
return Err(DownloadError::Fatal(
"Index age exceeds threshold and a newer index exists".into(),
));
}
}

View File

@@ -0,0 +1,450 @@
use std::sync::Arc;
use anyhow::bail;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::meta::region::RegionProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use aws_config::Region;
use clap::{Parser, ValueEnum};
use proxy::config::{self, remote_storage_from_toml, ProxyProtocolV2};
use proxy::context::parquet::ParquetUploadArgs;
use proxy::rate_limiter::RateBucketInfo;
use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use proxy::redis::elasticache;
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::{AsyncCommands, FromRedisValue, Value};
use remote_storage::RemoteStorageConfig;
use serde::{Deserialize, Serialize};
use tracing::warn;
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[derive(Clone, Debug, ValueEnum)]
enum AuthBackendType {
#[value(name("console"), alias("cplane"))]
ControlPlane,
#[value(name("link"), alias("control-redirect"))]
ConsoleRedirect,
#[cfg(feature = "testing")]
Postgres,
}
/// Neon proxy/router
#[derive(Parser)]
struct ProxyCliArgs {
/// Name of the region this proxy is deployed in
#[clap(long, default_value_t = String::new())]
region: String,
/// listen for incoming client connections on ip:port
#[clap(short, long, default_value = "127.0.0.1:4432")]
proxy: String,
#[clap(value_enum, long, default_value_t = AuthBackendType::ConsoleRedirect)]
auth_backend: AuthBackendType,
/// listen for management callback connection on ip:port
#[clap(short, long, default_value = "127.0.0.1:7000")]
mgmt: String,
/// listen for incoming http connections (metrics, etc) on ip:port
#[clap(long, default_value = "127.0.0.1:7001")]
http: String,
/// listen for incoming wss connections on ip:port
#[clap(long)]
wss: Option<String>,
/// redirect unauthenticated users to the given uri in case of console redirect auth
#[clap(short, long, default_value = "http://localhost:3000/psql_session/")]
uri: String,
/// cloud API endpoint for authenticating users
#[clap(
short,
long,
default_value = "http://localhost:3000/authenticate_proxy_request/"
)]
auth_endpoint: String,
/// JWT used to connect to control plane.
#[clap(
long,
value_name = "JWT",
default_value = "",
env = "NEON_PROXY_TO_CONTROLPLANE_TOKEN"
)]
control_plane_token: Arc<str>,
/// if this is not local proxy, this toggles whether we accept jwt or passwords for http
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_auth_broker: bool,
/// path to TLS key for client postgres connections
///
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
#[clap(short = 'k', long, alias = "ssl-key")]
tls_key: Option<String>,
/// path to TLS cert for client postgres connections
///
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
#[clap(short = 'c', long, alias = "ssl-cert")]
tls_cert: Option<String>,
/// path to directory with TLS certificates for client postgres connections
#[clap(long)]
certs_dir: Option<String>,
/// timeout for the TLS handshake
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
handshake_timeout: tokio::time::Duration,
/// http endpoint to receive periodic metric updates
#[clap(long)]
metric_collection_endpoint: Option<String>,
/// how often metrics should be sent to a collection endpoint
#[clap(long)]
metric_collection_interval: Option<String>,
/// cache for `wake_compute` api method (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
wake_compute_cache: String,
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
wake_compute_lock: String,
/// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)]
connect_compute_lock: String,
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
#[clap(flatten)]
sql_over_http: SqlOverHttpArgs,
/// timeout for scram authentication protocol
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
scram_protocol_timeout: tokio::time::Duration,
/// size of the threadpool for password hashing
#[clap(long, default_value_t = 4)]
scram_thread_pool_size: u8,
/// Endpoint rate limiter max number of requests per second.
///
/// Provided in the form `<Requests Per Second>@<Bucket Duration Size>`.
/// Can be given multiple times for different bucket sizes.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)]
endpoint_rps_limit: Vec<RateBucketInfo>,
/// Wake compute rate limiter max number of requests per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
wake_compute_limit: Vec<RateBucketInfo>,
/// Whether the auth rate limiter actually takes effect (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
auth_rate_limit_enabled: bool,
/// Authentication rate limiter max number of hashes per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)]
auth_rate_limit: Vec<RateBucketInfo>,
/// The IP subnet to use when considering whether two IP addresses are considered the same.
#[clap(long, default_value_t = 64)]
auth_rate_limit_ip_subnet: u8,
/// Redis rate limiter max number of requests per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// cache for `allowed_ips` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
allowed_ips_cache: String,
/// cache for `role_secret` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
role_secret_cache: String,
/// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections)
#[clap(long)]
redis_notifications: Option<String>,
/// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain".
#[clap(long, default_value = "irsa")]
redis_auth_type: String,
/// redis host for streaming connections (might be different from the notifications host)
#[clap(long)]
redis_host: Option<String>,
/// redis port for streaming connections (might be different from the notifications host)
#[clap(long)]
redis_port: Option<u16>,
/// redis cluster name, used in aws elasticache
#[clap(long)]
redis_cluster_name: Option<String>,
/// redis user_id, used in aws elasticache
#[clap(long)]
redis_user_id: Option<String>,
/// aws region to retrieve credentials
#[clap(long, default_value_t = String::new())]
aws_region: String,
/// cache for `project_info` (use `size=0` to disable)
#[clap(long, default_value = config::ProjectInfoCacheOptions::CACHE_DEFAULT_OPTIONS)]
project_info_cache: String,
/// cache for all valid endpoints
#[clap(long, default_value = config::EndpointCacheConfig::CACHE_DEFAULT_OPTIONS)]
endpoint_cache_config: String,
#[clap(flatten)]
parquet_upload: ParquetUploadArgs,
/// interval for backup metric collection
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
metric_backup_collection_interval: std::time::Duration,
/// remote storage configuration for backup metric collection
/// Encoded as toml (same format as pageservers), eg
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
#[clap(long, value_parser = remote_storage_from_toml)]
metric_backup_collection_remote_storage: Option<RemoteStorageConfig>,
/// chunk size for backup metric collection
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
#[clap(long, default_value = "4194304")]
metric_backup_collection_chunk_size: usize,
/// Whether to retry the connection to the compute node
#[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)]
connect_to_compute_retry: String,
/// Whether to retry the wake_compute request
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
wake_compute_retry: String,
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_private_access_proxy: bool,
/// Configure whether all incoming requests have a Proxy Protocol V2 packet.
// TODO(conradludgate): switch default to rejected or required once we've updated all deployments
#[clap(value_enum, long, default_value_t = ProxyProtocolV2::Supported)]
proxy_protocol_v2: ProxyProtocolV2,
/// Time the proxy waits for the webauth session to be confirmed by the control plane.
// TODO: rename to `console_redirect_confirmation_timeout`.
#[clap(long, default_value = "2m", value_parser = humantime::parse_duration)]
webauth_confirmation_timeout: std::time::Duration,
}
#[derive(clap::Args, Clone, Copy, Debug)]
struct SqlOverHttpArgs {
/// timeout for http connection requests
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
/// Whether the SQL over http pool is opt-in
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
sql_over_http_pool_opt_in: bool,
/// How many connections to pool for each endpoint. Excess connections are discarded
#[clap(long, default_value_t = 20)]
sql_over_http_pool_max_conns_per_endpoint: usize,
/// How many connections to pool for each endpoint. Excess connections are discarded
#[clap(long, default_value_t = 20000)]
sql_over_http_pool_max_total_conns: usize,
/// How long pooled connections should remain idle for before closing
#[clap(long, default_value = "5m", value_parser = humantime::parse_duration)]
sql_over_http_idle_timeout: tokio::time::Duration,
/// Duration each shard will wait on average before a GC sweep.
/// A longer time will causes sweeps to take longer but will interfere less frequently.
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
sql_over_http_pool_gc_epoch: tokio::time::Duration,
/// How many shards should the global pool have. Must be a power of two.
/// More shards will introduce less contention for pool operations, but can
/// increase memory used by the pool
#[clap(long, default_value_t = 128)]
sql_over_http_pool_shards: usize,
#[clap(long, default_value_t = 10000)]
sql_over_http_client_conn_threshold: u64,
#[clap(long, default_value_t = 64)]
sql_over_http_cancel_set_shards: usize,
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
sql_over_http_max_request_size_bytes: u64,
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
sql_over_http_max_response_size_bytes: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let args = ProxyCliArgs::parse();
let region_provider =
RegionProviderChain::default_provider().or_else(Region::new(args.aws_region.clone()));
let provider_conf =
ProviderConfig::without_region().with_region(region_provider.region().await);
let aws_credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
CredentialsProviderChain::first_try("env", EnvironmentVariableCredentialsProvider::new())
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
.or_else(
"profile-sso",
ProfileFileCredentialsProvider::builder()
.configure(&provider_conf)
.build(),
)
// uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
// needed to access remote extensions bucket
.or_else(
"token",
WebIdentityTokenCredentialsProvider::builder()
.configure(&provider_conf)
.build(),
)
// uses imds v2
.or_else("imds", ImdsCredentialsProvider::builder().build())
};
let elasticache_credentials_provider = Arc::new(elasticache::CredentialsProvider::new(
elasticache::AWSIRSAConfig::new(
args.aws_region.clone(),
args.redis_cluster_name,
args.redis_user_id,
),
aws_credentials_provider,
));
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
("plain", redis_url) => match redis_url {
None => {
bail!("plain auth requires redis_notifications to be set");
}
Some(url) => Some(
ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()),
),
},
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(
ConnectionWithCredentialsProvider::new_with_credentials_provider(
host.to_string(),
port,
elasticache_credentials_provider.clone(),
),
),
(None, None) => {
warn!("irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client");
None
}
_ => {
bail!("redis-host and redis-port must be specified together");
}
},
_ => {
bail!("unknown auth type given");
}
};
let endpoint_cache_config: config::EndpointCacheConfig = args.endpoint_cache_config.parse()?;
let Some(mut regional_redis_client) = regional_redis_client else {
bail!("no regional_redis_client");
};
if let Err(e) = regional_redis_client.connect().await {
bail!("error connecting to redis: {:?}", e);
}
let mut last_id = "0-0".to_string();
batch_read(
&mut regional_redis_client,
endpoint_cache_config.stream_name,
StreamReadOptions::default().count(endpoint_cache_config.default_batch_size),
&mut last_id,
true,
|event| {
let json = serde_json::to_string(&event)?;
println!("{}", json);
Ok(())
},
)
.await?;
Ok(())
}
// TODO: this could be an enum, but events in Redis need to be fixed first.
// ProjectCreated was sent with type:branch_created. So we ignore type.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct CPlaneEvent {
id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
endpoint_created: Option<EndpointCreated>,
#[serde(skip_serializing_if = "Option::is_none")]
branch_created: Option<BranchCreated>,
#[serde(skip_serializing_if = "Option::is_none")]
project_created: Option<ProjectCreated>,
#[serde(rename = "type")]
_type: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct EndpointCreated {
endpoint_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct BranchCreated {
branch_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct ProjectCreated {
project_id: String,
}
impl TryFrom<&Value> for CPlaneEvent {
type Error = anyhow::Error;
fn try_from(value: &Value) -> Result<Self, Self::Error> {
let json = String::from_redis_value(value)?;
Ok(serde_json::from_str(&json)?)
}
}
async fn batch_read(
conn: &mut ConnectionWithCredentialsProvider,
stream_name: String,
opts: StreamReadOptions,
last_id: &mut String,
return_when_finish: bool,
mut insert_event: impl FnMut(CPlaneEvent) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let mut total: usize = 0;
loop {
let mut res: StreamReadReply = conn
.xread_options(&[&stream_name], &[last_id.as_str()], &opts)
.await?;
if res.keys.is_empty() {
if return_when_finish {
if total != 0 {
break;
}
anyhow::bail!(
"Redis stream {} is empty, cannot be used to filter endpoints",
stream_name
);
}
// If we are not returning when finish, we should wait for more data.
continue;
}
if res.keys.len() != 1 {
anyhow::bail!("Cannot read from redis stream {}", stream_name);
}
let key = res.keys.pop().expect("Checked length above");
for stream_id in key.ids {
total += 1;
for value in stream_id.map.values() {
match value.try_into() {
Ok::<CPlaneEvent, _>(mut event) => {
event.id = Some(stream_id.id.clone());
insert_event(event)?;
}
Err(err) => {
tracing::error!("error parsing value {value:?}: {err:?}");
}
};
}
if total.is_power_of_two() {
tracing::debug!("endpoints read {}", total);
}
*last_id = stream_id.id;
}
}
tracing::info!("read {} endpoints/branches/projects from redis", total);
Ok(())
}

View File

@@ -80,7 +80,7 @@ impl ConnectionWithCredentialsProvider {
redis::cmd("PING").query_async(con).await
}
pub(crate) async fn connect(&mut self) -> anyhow::Result<()> {
pub async fn connect(&mut self) -> anyhow::Result<()> {
let _guard = self.mutex.lock().await;
if let Some(con) = self.con.as_mut() {
match Self::ping(con).await {

View File

@@ -262,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
// Send requests.
for req in reqgen {
while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
let msg = ProposerAcceptorMessage::AppendRequest(req);
msg_tx.send(msg).await.expect("send failed");
}

View File

@@ -217,8 +217,7 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
buckets.insert(0, 0.0);
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
// TODO: tweak this.
assert!(buckets.len() <= 16, "too many histogram buckets");
assert!(buckets.len() <= 12, "too many histogram buckets");
register_histogram!(
"safekeeper_wal_receiver_queue_depth",

View File

@@ -7,15 +7,14 @@ use crate::metrics::{
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
};
use crate::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
ServerInfo,
};
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::safekeeper::ServerInfo;
use crate::timeline::WalResidentTimeline;
use crate::wal_service::ConnectionId;
use crate::GlobalTimelines;
use anyhow::{anyhow, Context};
use bytes::{BufMut as _, Bytes, BytesMut};
use bytes::BytesMut;
use parking_lot::MappedMutexGuard;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
@@ -207,8 +206,7 @@ impl Drop for WalReceiverGuard {
}
}
// TODO: reconsider this.
pub const MSG_QUEUE_SIZE: usize = 4096;
pub const MSG_QUEUE_SIZE: usize = 256;
pub const REPLY_QUEUE_SIZE: usize = 16;
impl SafekeeperPostgresHandler {
@@ -486,9 +484,6 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
/// The AppendRequest buffer size.
const APPEND_BUFFER_SIZE: usize = 1024 * 1024;
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx.
///
@@ -535,9 +530,6 @@ impl WalAcceptor {
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
// Buffer AppendRequests to submit them as a single large write.
let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE);
// Periodically flush the WAL and compute metrics.
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
@@ -554,7 +546,7 @@ impl WalAcceptor {
// Process inbound message.
msg = self.msg_rx.recv() => {
// If disconnected, break to flush WAL and return.
let Some(msg) = msg else {
let Some(mut msg) = msg else {
break;
};
@@ -571,44 +563,11 @@ impl WalAcceptor {
// This batches multiple appends per fsync. If the channel is empty after
// sending the reply, we'll schedule an immediate flush.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
// Try to batch multiple messages into a single large write.
if !append_buf.is_empty() || !self.msg_rx.is_empty() {
if append_buf.add(&append_request) {
continue; // message buffered, go get next message
}
// Full buffer, write it and buffer this message for next iteration.
dirty = true;
let buf_req = append_buf.take().expect("empty buffer");
let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req);
let reply = self.tli.process_msg(&buf_msg).await?;
drop(buf_msg); // allow reusing buffer for add
assert!(append_buf.add(&append_request), "empty buffer rejected msg");
reply
} else {
dirty = true;
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
self.tli.process_msg(&msg).await?
}
} else {
self.tli.process_msg(&msg).await?
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
}
}
// If there are no pending messages, write the append buffer.
//
// NB: we don't also flush the WAL here. Otherwise we can get into a regime where we
// quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx.
// This happens consistently due to Tokio scheduling, leading to overeager fsyncing.
// Instead, we perform the write without fsyncing and give the sender a chance to
// get scheduled and populate msg_rx for the next iteration. If there are no further
// messages, the next iteration will flush the WAL.
_ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => {
dirty = true;
let buf_req = append_buf.take().expect("empty buffer");
self.tli
.process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req))
.await?
self.tli.process_msg(&msg).await?
}
// While receiving AppendRequests, flush the WAL periodically and respond with an
@@ -620,11 +579,11 @@ impl WalAcceptor {
.await?
}
// If there are no pending messages, flush the WAL and append buffer immediately.
// If there are no pending messages, flush the WAL immediately.
//
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
_ = future::ready(()), if self.msg_rx.is_empty() && dirty => {
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
dirty = false;
flush_ticker.reset();
self.tli
@@ -668,115 +627,3 @@ impl Drop for WalAcceptor {
}
}
}
/// Buffers WAL data for multiple AppendRequests, to submit them as a single write.
struct BufferedAppendRequest {
/// The buffer capacity.
capacity: usize,
/// The buffered header and WAL data.
buf: Option<(AppendRequestHeader, BytesMut)>,
/// A previous buffer that can be reused when the returned message is dropped.
reuse_buf: Option<Bytes>,
/// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to
/// avoid growing the buffer and copying it. This will be returned as-is.
large: Option<AppendRequest>,
}
impl BufferedAppendRequest {
/// Creates a new append request buffer with the given capacity.
fn new(capacity: usize) -> Self {
Self {
capacity,
buf: None,
reuse_buf: None,
large: None,
}
}
/// Adds the given append request to the buffer, if possible. Returns `false` if the message
/// can't be buffered, leaving self unmodified. An empty buffer will always accept a message.
///
/// If the buffer is not empty, the message must have the same term and proposer and contiguous
/// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire
/// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but
/// simply stashed away in `large` without growing the buffer.
pub fn add(&mut self, msg: &AppendRequest) -> bool {
// If there is a stashed large message, reject further messages.
if self.large.is_some() {
return false;
}
// If there is no existing buffer, initialize one with the message.
let Some((ref mut h, ref mut wal_data)) = self.buf else {
// If the message is larger than the buffer capacity, just stash it instead of growing.
if msg.wal_data.len() > self.capacity {
assert!(self.large.is_none());
self.large = Some(msg.clone()); // clone is cheap with Bytes
return true;
}
// Reuse a previous buffer, if any, or allocate a new one.
//
// TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a
// normal Vec<u8> we could do compile-time borrow checking instead and avoid panic.
let mut wal_data = match self.reuse_buf.take() {
Some(reuse_buf) => match reuse_buf.try_into_mut() {
Ok(mut reuse_buf) => {
assert_eq!(reuse_buf.capacity(), self.capacity);
reuse_buf.clear();
reuse_buf
}
Err(_) => panic!("couldn't reuse buffer, still in use"),
},
None => BytesMut::with_capacity(self.capacity),
};
// Copy the append request into the buffer.
wal_data.put_slice(&msg.wal_data);
self.buf = Some((msg.h, wal_data));
return true;
};
// The messages must have the same term and proposer.
if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid {
return false;
}
// The messages must be contiguous.
if h.end_lsn != msg.h.begin_lsn {
return false;
}
// The message must fit in the buffer.
if wal_data.len() + msg.wal_data.len() > self.capacity {
return false;
}
// Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later
// messages have later commit/truncate LSNs.
h.end_lsn = msg.h.end_lsn;
h.commit_lsn = msg.h.commit_lsn;
h.truncate_lsn = msg.h.truncate_lsn;
wal_data.put_slice(&msg.wal_data);
true
}
/// Returns true if there is no buffered message.
fn is_empty(&self) -> bool {
self.buf.is_none() && self.large.is_none()
}
/// Takes the buffered AppendRequest (if any), leaving a None in its place.
///
/// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order
/// to reuse the buffer. This is basically runtime borrow checking, because of Bytes.
fn take(&mut self) -> Option<AppendRequest> {
// If there is a stashed large message, return it.
if let Some(large) = self.large.take() {
assert!(self.buf.is_none(), "both buf and large are set");
return Some(large);
}
let (h, wal_data) = self.buf.take()?;
let wal_data = wal_data.freeze();
self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer
Some(AppendRequest { h, wal_data })
}
}

View File

@@ -296,13 +296,12 @@ pub struct ProposerElected {
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// communicates commit_lsn.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct AppendRequest {
pub h: AppendRequestHeader,
pub wal_data: Bytes,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
@@ -1167,7 +1166,7 @@ mod tests {
proposer_uuid: [0; 16],
};
let mut append_request = AppendRequest {
h: ar_hdr,
h: ar_hdr.clone(),
wal_data: Bytes::from_static(b"b"),
};
@@ -1241,7 +1240,7 @@ mod tests {
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr,
h: ar_hdr.clone(),
wal_data: Bytes::from_static(b"b"),
};
@@ -1249,7 +1248,7 @@ mod tests {
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
let mut ar_hrd2 = ar_hdr;
let mut ar_hrd2 = ar_hdr.clone();
ar_hrd2.begin_lsn = Lsn(4);
ar_hrd2.end_lsn = Lsn(5);
let append_request = AppendRequest {

View File

@@ -127,23 +127,29 @@ pub struct PhysicalStorage {
/// - doesn't point to the end of the segment
file: Option<File>,
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
/// there can be a case with unexpected .partial file.
/// When true, WAL truncation potentially has been interrupted and we need
/// to finish it before allowing WAL writes; see truncate_wal for details.
/// In this case [`write_lsn`] can be less than actually written WAL on
/// disk. In particular, there can be a case with unexpected .partial file.
///
/// Imagine the following:
/// - 000000010000000000000001
/// - it was fully written, but the last record is split between 2 segments
/// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
/// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
/// - it was fully written, but the last record is split between 2
/// segments
/// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in
/// the end of this segment
/// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were
/// initialized to 0/1FFFFF0
/// - 000000010000000000000002.partial
/// - it has only 1 byte written, which is not enough to make a full WAL record
/// - it has only 1 byte written, which is not enough to make a full WAL
/// record
///
/// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
/// This flag will be set to true after the first truncate_wal() call.
/// Partial segment 002 has no WAL records, and it will be removed by the
/// next truncate_wal(). This flag will be set to true after the first
/// truncate_wal() call.
///
/// [`write_lsn`]: Self::write_lsn
is_truncated_after_restart: bool,
pending_wal_truncation: bool,
}
impl PhysicalStorage {
@@ -208,7 +214,7 @@ impl PhysicalStorage {
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
file: None,
is_truncated_after_restart: false,
pending_wal_truncation: true,
})
}
@@ -405,6 +411,13 @@ impl Storage for PhysicalStorage {
startpos
);
}
if self.pending_wal_truncation {
bail!(
"write_wal called with pending WAL truncation, write_lsn={}, startpos={}",
self.write_lsn,
startpos
);
}
let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
// WAL is written, updating write metrics
@@ -479,15 +492,34 @@ impl Storage for PhysicalStorage {
);
}
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
// disk (this happens on each connect).
if self.is_truncated_after_restart
// Quick exit if nothing to do and we know that the state is clean to
// avoid writing up to 16 MiB of zeros on disk (this happens on each
// connect).
if !self.pending_wal_truncation
&& end_pos == self.write_lsn
&& end_pos == self.flush_record_lsn
{
return Ok(());
}
// Atomicity: we start with LSNs reset because once on disk deletion is
// started it can't be reversed. However, we might crash/error in the
// middle, leaving garbage above the truncation point. In theory,
// concatenated with previous records it might form bogus WAL (though
// very unlikely in practice because CRC would guard from that). To
// protect, set pending_wal_truncation flag before beginning: it means
// truncation must be retried and WAL writes are prohibited until it
// succeeds. Flag is also set on boot because we don't know if the last
// state was clean.
//
// Protocol (HandleElected before first AppendRequest) ensures we'll
// always try to ensure clean truncation before any writes.
self.pending_wal_truncation = true;
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.flush_record_lsn = end_pos;
// Close previously opened file, if any
if let Some(unflushed_file) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
@@ -513,11 +545,7 @@ impl Storage for PhysicalStorage {
fs::rename(wal_file_path, wal_file_partial_path).await?;
}
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.flush_record_lsn = end_pos;
self.is_truncated_after_restart = true;
self.pending_wal_truncation = false;
Ok(())
}

View File

@@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, cast, final
import requests
from fixtures.log_helper import log
if TYPE_CHECKING:
from typing import Any, Literal, Optional
@@ -30,7 +32,11 @@ class NeonAPI:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text)
resp.raise_for_status()
return resp
def create_project(
self,
@@ -66,8 +72,6 @@ class NeonAPI:
json=data,
)
assert resp.status_code == 201
return cast("dict[str, Any]", resp.json())
def get_project_details(self, project_id: str) -> dict[str, Any]:
@@ -79,7 +83,7 @@ class NeonAPI:
"Content-Type": "application/json",
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def delete_project(
@@ -95,8 +99,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def start_endpoint(
@@ -112,8 +114,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def suspend_endpoint(
@@ -129,8 +129,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def restart_endpoint(
@@ -146,8 +144,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def create_endpoint(
@@ -178,8 +174,6 @@ class NeonAPI:
json=data,
)
assert resp.status_code == 201
return cast("dict[str, Any]", resp.json())
def get_connection_uri(
@@ -206,8 +200,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def get_branches(self, project_id: str) -> dict[str, Any]:
@@ -219,8 +211,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def get_endpoints(self, project_id: str) -> dict[str, Any]:
@@ -232,8 +222,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def get_operations(self, project_id: str) -> dict[str, Any]:
@@ -246,8 +234,6 @@ class NeonAPI:
},
)
assert resp.status_code == 200
return cast("dict[str, Any]", resp.json())
def wait_for_operation_to_finish(self, project_id: str):

View File

@@ -35,9 +35,10 @@ from fixtures.pageserver.utils import (
wait_for_upload,
)
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
)
from fixtures.utils import wait_until
from fixtures.utils import run_only_on_default_postgres, wait_until
from fixtures.workload import Workload
if TYPE_CHECKING:
@@ -728,3 +729,68 @@ def test_upgrade_generationless_local_file_paths(
)
# We should download into the same local path we started with
assert os.path.exists(victim_path)
@run_only_on_default_postgres("Only tests index logic")
def test_old_index_time_threshold(
neon_env_builder: NeonEnvBuilder,
):
"""
Exercise pageserver's detection of trying to load an ancient non-latest index.
(see https://github.com/neondatabase/neon/issues/6951)
"""
# Run with local_fs because we will interfere with mtimes by local filesystem access
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(32)
# Remember generation 1's index path
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
index_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
# Increment generation by detaching+attaching, and write+flush some data to get a new remote index
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
env.storage_controller.reconcile_until_idle()
workload.churn_rows(32)
# A new index should have been written
assert env.pageserver_remote_storage.index_path(tenant_id, timeline_id) != index_path
# Hack the mtime on the generation 1 index
log.info(f"Setting old mtime on {index_path}")
os.utime(index_path, times=(time.time(), time.time() - 30 * 24 * 3600))
env.pageserver.allowed_errors.extend(
[
".*Found a newer index while loading an old one.*",
".*Index age exceeds threshold and a newer index exists.*",
]
)
# Detach from storage controller + attach in an old generation directly on the pageserver.
workload.stop()
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
env.storage_controller.reconcile_until_idle()
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"})
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy")
# The controller would not do this (attach in an old generation): we are doing it to simulate
# a hypothetical profound bug in the controller.
env.pageserver.http_client().tenant_location_conf(
tenant_id, {"generation": 1, "mode": "AttachedSingle", "tenant_conf": {}}
)
# The pageserver should react to this situation by refusing to attach the tenant and putting
# it into Broken state
env.pageserver.allowed_errors.append(".*tenant is broken.*")
with pytest.raises(
PageserverApiException,
match="tenant is broken: Index age exceeds threshold and a newer index exists",
):
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)

16
vendor/revisions.json vendored
View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.0",
"ae4cc30dba24f3910533e5a48e8103c3f2fff300"
"17.1",
"aa2e29f2b6952140dfe51876bbd11054acae776f"
],
"v16": [
"16.4",
"03b43900edc5d8d6eecec460bfc89aec7174bd84"
"16.5",
"b0b693ea298454e95e6b154780d1fd586a244dfd"
],
"v15": [
"15.8",
"fd631a959049dfe2b82f67409c8b8b0d3e0016d1"
"15.9",
"1feff6b60f07cb71b665d0f5ead71a4320a71743"
],
"v14": [
"14.13",
"de0a000dafc2e66ce2e39282d3aa1c704fe0390e"
"14.14",
"c5e0d642efb02e4bfedc283b0a7707fe6c79cc89"
]
}