mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
Compare commits
4 Commits
skyzh/l0-l
...
jcsp/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d18b74324 | ||
|
|
3e8bf2159d | ||
|
|
5008324460 | ||
|
|
487f3202fe |
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -263,8 +263,9 @@ jobs:
|
||||
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
|
||||
|
||||
benchmarks:
|
||||
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
|
||||
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
|
||||
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `deploy` in PRs
|
||||
if: github.ref_name == 'main' || (contains(github.event.pull_request.labels.*.name, 'run-benchmarks') && !failure() && !cancelled())
|
||||
needs: [ check-permissions, build-build-tools-image, get-benchmarks-durations, deploy ]
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
|
||||
77
Cargo.lock
generated
77
Cargo.lock
generated
@@ -2398,9 +2398,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
|
||||
|
||||
[[package]]
|
||||
name = "futures-timer"
|
||||
version = "3.0.2"
|
||||
version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
|
||||
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
@@ -2503,6 +2503,27 @@ version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
|
||||
[[package]]
|
||||
name = "governor"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"dashmap 6.1.0",
|
||||
"futures-sink",
|
||||
"futures-timer",
|
||||
"futures-util",
|
||||
"no-std-compat",
|
||||
"nonzero_ext",
|
||||
"parking_lot 0.12.1",
|
||||
"portable-atomic",
|
||||
"quanta",
|
||||
"rand 0.8.5",
|
||||
"smallvec",
|
||||
"spinning_top",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "group"
|
||||
version = "0.12.1"
|
||||
@@ -3702,6 +3723,12 @@ dependencies = [
|
||||
"memoffset 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "no-std-compat"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@@ -3712,6 +3739,12 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nonzero_ext"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "8.0.0"
|
||||
@@ -4570,6 +4603,12 @@ dependencies = [
|
||||
"never-say-never",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "portable-atomic"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
|
||||
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.7"
|
||||
@@ -5036,6 +5075,21 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quanta"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"raw-cpuid",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
@@ -5166,6 +5220,15 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "raw-cpuid"
|
||||
version = "11.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.7.0"
|
||||
@@ -6374,6 +6437,15 @@ version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
|
||||
[[package]]
|
||||
name = "spinning_top"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
version = "0.6.0"
|
||||
@@ -6449,6 +6521,7 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"governor",
|
||||
"hex",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
|
||||
@@ -95,8 +95,6 @@ pub struct LayerMap {
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
///
|
||||
/// NB: make sure to notify `watch_l0_deltas` on changes.
|
||||
/// NB: this is not sorted by LSN, but by the order of insertion; always use the historic layer info to
|
||||
/// retrieve L0 layers in order.
|
||||
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
|
||||
|
||||
/// Notifies about L0 delta layer changes, sending the current number of L0 layers.
|
||||
|
||||
@@ -5433,8 +5433,7 @@ impl Timeline {
|
||||
// because we have not implemented L0 => L0 compaction.
|
||||
duplicated_layers.insert(l.layer_desc().key());
|
||||
} else if LayerMap::is_l0(&l.layer_desc().key_range, l.layer_desc().is_delta) {
|
||||
// This is not an error any more because we allow L0-L0 compaction.
|
||||
// return Err(CompactionError::Other(anyhow::anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
|
||||
return Err(CompactionError::Other(anyhow::anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
|
||||
} else {
|
||||
insert_layers.push(l.clone());
|
||||
}
|
||||
|
||||
@@ -1199,13 +1199,11 @@ impl Timeline {
|
||||
//
|
||||
// In general, compaction_threshold should be <= compaction_upper_limit, but in case that
|
||||
// the constraint is not respected, we use the larger of the two.
|
||||
let delta_target_size =
|
||||
std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
|
||||
let delta_size_limit = std::cmp::max(
|
||||
self.get_compaction_upper_limit(),
|
||||
self.get_compaction_threshold(),
|
||||
) as u64
|
||||
* delta_target_size;
|
||||
* std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
|
||||
|
||||
let mut fully_compacted = true;
|
||||
|
||||
@@ -1243,21 +1241,12 @@ impl Timeline {
|
||||
end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
|
||||
};
|
||||
|
||||
// If the total size of the deltas to compact is less than the target size, we produce the newly-generated layers
|
||||
// as L0 files. Otherwise, we produce L1 files.
|
||||
let l0_to_l0_compaction = deltas_to_compact
|
||||
.iter()
|
||||
.map(|l| l.metadata().file_size)
|
||||
.sum::<u64>()
|
||||
<= delta_target_size;
|
||||
|
||||
info!(
|
||||
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total), l0_to_l0_compaction={}",
|
||||
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
|
||||
lsn_range.start,
|
||||
lsn_range.end,
|
||||
deltas_to_compact.len(),
|
||||
level0_deltas.len(),
|
||||
l0_to_l0_compaction
|
||||
level0_deltas.len()
|
||||
);
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
@@ -1513,8 +1502,7 @@ impl Timeline {
|
||||
dup_start_lsn = dup_end_lsn;
|
||||
dup_end_lsn = lsn_range.end;
|
||||
}
|
||||
if writer.is_some() && !l0_to_l0_compaction {
|
||||
// L0-L0 compaction ONLY produces one layer.
|
||||
if writer.is_some() {
|
||||
let written_size = writer.as_mut().unwrap().size();
|
||||
let contains_hole =
|
||||
next_hole < holes.len() && key >= holes[next_hole].key_range.end;
|
||||
@@ -1564,7 +1552,7 @@ impl Timeline {
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
if l0_to_l0_compaction { Key::MIN } else { key },
|
||||
key,
|
||||
if dup_end_lsn.is_valid() {
|
||||
// this is a layer containing slice of values of the same key
|
||||
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
|
||||
@@ -1603,14 +1591,7 @@ impl Timeline {
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
let (desc, path) = writer
|
||||
.finish(
|
||||
if l0_to_l0_compaction {
|
||||
Key::MAX
|
||||
} else {
|
||||
prev_key.unwrap().next()
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
|
||||
@@ -496,7 +496,8 @@ pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
|
||||
/// bad storage or bad configuration, and we can't fix that from inside
|
||||
/// a running process.
|
||||
pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
|
||||
tracing::error!("Fatal I/O error: {e}: {context})");
|
||||
let backtrace = std::backtrace::Backtrace::force_capture();
|
||||
tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
|
||||
std::process::abort();
|
||||
}
|
||||
|
||||
@@ -947,13 +948,18 @@ impl VirtualFileInner {
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
let file_guard = match self.lock_file().await {
|
||||
let file_guard = match self
|
||||
.lock_file()
|
||||
.await
|
||||
.maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
|
||||
{
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
|
||||
if let Ok(size) = res {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&[
|
||||
|
||||
@@ -626,7 +626,7 @@ pub fn make_router(
|
||||
failpoints_handler(r, cancel).await
|
||||
})
|
||||
})
|
||||
.get("/v1/uzilization", |r| request_span(r, utilization_handler))
|
||||
.get("/v1/utilization", |r| request_span(r, utilization_handler))
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
|
||||
@@ -26,6 +26,7 @@ humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
governor = {version = "0.8.0"}
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::reconciler::ReconcileError;
|
||||
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
|
||||
use anyhow::Context;
|
||||
use futures::Future;
|
||||
use governor::{Quota, RateLimiter};
|
||||
use http_utils::{
|
||||
endpoint::{self, auth_middleware, check_permission_with, request_span},
|
||||
error::ApiError,
|
||||
@@ -32,6 +33,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use std::num::NonZero;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -516,6 +518,14 @@ async fn handle_tenant_timeline_block_unblock_gc(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
static PASSTHROUGH_RATE_LIMITER: std::sync::OnceLock<
|
||||
RateLimiter<
|
||||
TenantId,
|
||||
governor::state::keyed::DefaultKeyedStateStore<TenantId>,
|
||||
governor::clock::DefaultClock,
|
||||
>,
|
||||
> = std::sync::OnceLock::new();
|
||||
|
||||
async fn handle_tenant_timeline_passthrough(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -537,6 +547,19 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
|
||||
|
||||
// Proxied requests are expected to be rare on a per-tenant basis: these are things
|
||||
// like inspecting a timeline's details or doing an LSN<->timestamp mapping. Not anything
|
||||
// that has high throughput.
|
||||
let limiter = PASSTHROUGH_RATE_LIMITER.get_or_init(|| {
|
||||
RateLimiter::new(
|
||||
Quota::per_second(NonZero::new(10).unwrap()),
|
||||
governor::state::keyed::DefaultKeyedStateStore::new(),
|
||||
governor::clock::DefaultClock::default(),
|
||||
)
|
||||
});
|
||||
|
||||
limiter.until_key_ready(&tenant_id).await;
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
|
||||
|
||||
@@ -3189,15 +3189,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
assert len(target.get_safekeepers()) == 0
|
||||
|
||||
sk_0 = env.safekeepers[0]
|
||||
|
||||
body = {
|
||||
"active": True,
|
||||
"id": fake_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "safekeeper-333.us-east-2.aws.neon.build",
|
||||
"port": 6401,
|
||||
"http_port": 7676,
|
||||
"host": "localhost",
|
||||
"port": sk_0.port.pg,
|
||||
"http_port": sk_0.port.http,
|
||||
"version": 5957,
|
||||
"availability_zone_id": "us-east-2b",
|
||||
}
|
||||
@@ -3243,6 +3245,13 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
# Ensure idempotency
|
||||
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
|
||||
|
||||
def storcon_heartbeat():
|
||||
assert env.storage_controller.log_contains(
|
||||
"Heartbeat round complete for 1 safekeepers, 0 offline"
|
||||
)
|
||||
|
||||
wait_until(storcon_heartbeat)
|
||||
|
||||
|
||||
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
compared = [dict(a), dict(b)]
|
||||
|
||||
Reference in New Issue
Block a user