mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Compare commits
4 Commits
skyzh/imag
...
jcsp/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d18b74324 | ||
|
|
3e8bf2159d | ||
|
|
5008324460 | ||
|
|
487f3202fe |
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -263,8 +263,9 @@ jobs:
|
||||
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
|
||||
|
||||
benchmarks:
|
||||
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
|
||||
needs: [ check-permissions, build-and-test-locally, build-build-tools-image, get-benchmarks-durations ]
|
||||
# `!failure() && !cancelled()` is required because the workflow depends on the job that can be skipped: `deploy` in PRs
|
||||
if: github.ref_name == 'main' || (contains(github.event.pull_request.labels.*.name, 'run-benchmarks') && !failure() && !cancelled())
|
||||
needs: [ check-permissions, build-build-tools-image, get-benchmarks-durations, deploy ]
|
||||
permissions:
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
statuses: write
|
||||
|
||||
77
Cargo.lock
generated
77
Cargo.lock
generated
@@ -2398,9 +2398,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
|
||||
|
||||
[[package]]
|
||||
name = "futures-timer"
|
||||
version = "3.0.2"
|
||||
version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
|
||||
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
@@ -2503,6 +2503,27 @@ version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
|
||||
[[package]]
|
||||
name = "governor"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"dashmap 6.1.0",
|
||||
"futures-sink",
|
||||
"futures-timer",
|
||||
"futures-util",
|
||||
"no-std-compat",
|
||||
"nonzero_ext",
|
||||
"parking_lot 0.12.1",
|
||||
"portable-atomic",
|
||||
"quanta",
|
||||
"rand 0.8.5",
|
||||
"smallvec",
|
||||
"spinning_top",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "group"
|
||||
version = "0.12.1"
|
||||
@@ -3702,6 +3723,12 @@ dependencies = [
|
||||
"memoffset 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "no-std-compat"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@@ -3712,6 +3739,12 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nonzero_ext"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "8.0.0"
|
||||
@@ -4570,6 +4603,12 @@ dependencies = [
|
||||
"never-say-never",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "portable-atomic"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
|
||||
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.7"
|
||||
@@ -5036,6 +5075,21 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quanta"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"raw-cpuid",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
@@ -5166,6 +5220,15 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "raw-cpuid"
|
||||
version = "11.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.7.0"
|
||||
@@ -6374,6 +6437,15 @@ version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
|
||||
[[package]]
|
||||
name = "spinning_top"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
version = "0.6.0"
|
||||
@@ -6449,6 +6521,7 @@ dependencies = [
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"governor",
|
||||
"hex",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
|
||||
@@ -283,53 +283,6 @@ pub(crate) enum SpawnMode {
|
||||
Lazy,
|
||||
}
|
||||
|
||||
/// A notifier that can be used to trigger compaction shared by all timelines of a tenant.
|
||||
///
|
||||
/// It is used to notify the compaction loop that there is work to be done. It is also used
|
||||
/// to balance the load of compaction between timelines. If there are pending L0 compaction in
|
||||
/// one of the timeline, it could preempt long-running compaction jobs (e.g., image compaction)
|
||||
/// on other timelines.
|
||||
pub struct CompactionNotifier {
|
||||
notify: Notify,
|
||||
l0_count: std::sync::Mutex<HashMap<TimelineId, usize>>,
|
||||
}
|
||||
|
||||
impl CompactionNotifier {
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
notify: Notify::new(),
|
||||
l0_count: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn notify_one(&self) {
|
||||
self.notify.notify_one();
|
||||
}
|
||||
|
||||
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
|
||||
self.notify.notified()
|
||||
}
|
||||
|
||||
pub fn on_l0_update(&self, timeline_id: TimelineId, l0_count: usize) {
|
||||
let mut guard = self.l0_count.lock().unwrap();
|
||||
guard.insert(timeline_id, l0_count);
|
||||
}
|
||||
|
||||
pub fn on_shutdown(&self, timeline_id: TimelineId) {
|
||||
let mut guard = self.l0_count.lock().unwrap();
|
||||
guard.remove(&timeline_id);
|
||||
}
|
||||
|
||||
pub fn get_max_l0_count(&self) -> Option<(usize, TimelineId)> {
|
||||
let guard = self.l0_count.lock().unwrap();
|
||||
guard
|
||||
.iter()
|
||||
.max_by_key(|(_, count)| *count)
|
||||
.map(|(timeline_id, count)| (*count, *timeline_id))
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -405,7 +358,7 @@ pub struct Tenant {
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// Signals the tenant compaction loop that there is L0 compaction work to be done.
|
||||
pub(crate) l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
pub(crate) l0_compaction_trigger: Arc<Notify>,
|
||||
|
||||
/// Scheduled gc-compaction tasks.
|
||||
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
|
||||
@@ -4289,7 +4242,7 @@ impl Tenant {
|
||||
// use an extremely long backoff.
|
||||
Some(Duration::from_secs(3600 * 24)),
|
||||
)),
|
||||
l0_compaction_trigger: Arc::new(CompactionNotifier::new()),
|
||||
l0_compaction_trigger: Arc::new(Notify::new()),
|
||||
scheduled_compaction_tasks: Mutex::new(Default::default()),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
|
||||
|
||||
@@ -47,7 +47,7 @@ use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::{oneshot, watch};
|
||||
use tokio::sync::{oneshot, watch, Notify};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::critical;
|
||||
@@ -151,8 +151,7 @@ use super::{
|
||||
MaybeOffloaded,
|
||||
};
|
||||
use super::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, CompactionNotifier,
|
||||
HeatMapTimeline,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline,
|
||||
};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{
|
||||
@@ -225,7 +224,7 @@ pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
|
||||
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
|
||||
pub l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
@@ -429,7 +428,7 @@ pub struct Timeline {
|
||||
compaction_failed: AtomicBool,
|
||||
|
||||
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
|
||||
l0_compaction_trigger: Arc<CompactionNotifier>,
|
||||
l0_compaction_trigger: Arc<Notify>,
|
||||
|
||||
/// Make sure we only have one running gc at a time.
|
||||
///
|
||||
@@ -1966,8 +1965,6 @@ impl Timeline {
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
self.l0_compaction_trigger.on_shutdown(self.timeline_id);
|
||||
|
||||
if let ShutdownMode::FreezeAndFlush = mode {
|
||||
let do_flush = if let Some((open, frozen)) = self
|
||||
.layers
|
||||
@@ -4122,8 +4119,6 @@ impl Timeline {
|
||||
if l0_count >= self.get_compaction_threshold() {
|
||||
self.l0_compaction_trigger.notify_one();
|
||||
}
|
||||
self.l0_compaction_trigger
|
||||
.on_l0_update(self.timeline_id, l0_count);
|
||||
|
||||
// Delay the next flush to backpressure if compaction can't keep up. We delay by the
|
||||
// flush duration such that the flush takes 2x as long. This is propagated up to WAL
|
||||
@@ -5072,22 +5067,20 @@ impl Timeline {
|
||||
// image layer generation taking too long time and blocking L0 compaction. So in this
|
||||
// mode, we also inspect the current number of L0 layers and skip image layer generation
|
||||
// if there are too many of them.
|
||||
if let Some((max_num_of_l0_layers, timeline_id)) =
|
||||
self.l0_compaction_trigger.get_max_l0_count()
|
||||
{
|
||||
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0
|
||||
&& max_num_of_l0_layers >= image_preempt_threshold
|
||||
{
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {} on timeline {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), max_num_of_l0_layers, timeline_id
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
let num_of_l0_layers = {
|
||||
let layers = self.layers.read().await;
|
||||
layers.layer_map()?.level0_deltas().len()
|
||||
};
|
||||
let image_preempt_threshold = self.get_image_creation_preempt_threshold()
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5466,13 +5459,8 @@ impl Timeline {
|
||||
self.remote_client
|
||||
.schedule_compaction_update(&remove_layers, new_deltas)?;
|
||||
|
||||
let l0_count = guard.layer_map()?.level0_deltas().len();
|
||||
|
||||
drop_wlock(guard);
|
||||
|
||||
self.l0_compaction_trigger
|
||||
.on_l0_update(self.timeline_id, l0_count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -496,7 +496,8 @@ pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
|
||||
/// bad storage or bad configuration, and we can't fix that from inside
|
||||
/// a running process.
|
||||
pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
|
||||
tracing::error!("Fatal I/O error: {e}: {context})");
|
||||
let backtrace = std::backtrace::Backtrace::force_capture();
|
||||
tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
|
||||
std::process::abort();
|
||||
}
|
||||
|
||||
@@ -947,13 +948,18 @@ impl VirtualFileInner {
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
let file_guard = match self.lock_file().await {
|
||||
let file_guard = match self
|
||||
.lock_file()
|
||||
.await
|
||||
.maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
|
||||
{
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
|
||||
if let Ok(size) = res {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&[
|
||||
|
||||
@@ -626,7 +626,7 @@ pub fn make_router(
|
||||
failpoints_handler(r, cancel).await
|
||||
})
|
||||
})
|
||||
.get("/v1/uzilization", |r| request_span(r, utilization_handler))
|
||||
.get("/v1/utilization", |r| request_span(r, utilization_handler))
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
|
||||
@@ -26,6 +26,7 @@ humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
governor = {version = "0.8.0"}
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::reconciler::ReconcileError;
|
||||
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
|
||||
use anyhow::Context;
|
||||
use futures::Future;
|
||||
use governor::{Quota, RateLimiter};
|
||||
use http_utils::{
|
||||
endpoint::{self, auth_middleware, check_permission_with, request_span},
|
||||
error::ApiError,
|
||||
@@ -32,6 +33,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use std::num::NonZero;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -516,6 +518,14 @@ async fn handle_tenant_timeline_block_unblock_gc(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
static PASSTHROUGH_RATE_LIMITER: std::sync::OnceLock<
|
||||
RateLimiter<
|
||||
TenantId,
|
||||
governor::state::keyed::DefaultKeyedStateStore<TenantId>,
|
||||
governor::clock::DefaultClock,
|
||||
>,
|
||||
> = std::sync::OnceLock::new();
|
||||
|
||||
async fn handle_tenant_timeline_passthrough(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -537,6 +547,19 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
|
||||
|
||||
// Proxied requests are expected to be rare on a per-tenant basis: these are things
|
||||
// like inspecting a timeline's details or doing an LSN<->timestamp mapping. Not anything
|
||||
// that has high throughput.
|
||||
let limiter = PASSTHROUGH_RATE_LIMITER.get_or_init(|| {
|
||||
RateLimiter::new(
|
||||
Quota::per_second(NonZero::new(10).unwrap()),
|
||||
governor::state::keyed::DefaultKeyedStateStore::new(),
|
||||
governor::clock::DefaultClock::default(),
|
||||
)
|
||||
});
|
||||
|
||||
limiter.until_key_ready(&tenant_id).await;
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
|
||||
|
||||
|
||||
@@ -3189,15 +3189,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
assert len(target.get_safekeepers()) == 0
|
||||
|
||||
sk_0 = env.safekeepers[0]
|
||||
|
||||
body = {
|
||||
"active": True,
|
||||
"id": fake_id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "safekeeper-333.us-east-2.aws.neon.build",
|
||||
"port": 6401,
|
||||
"http_port": 7676,
|
||||
"host": "localhost",
|
||||
"port": sk_0.port.pg,
|
||||
"http_port": sk_0.port.http,
|
||||
"version": 5957,
|
||||
"availability_zone_id": "us-east-2b",
|
||||
}
|
||||
@@ -3243,6 +3245,13 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
# Ensure idempotency
|
||||
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
|
||||
|
||||
def storcon_heartbeat():
|
||||
assert env.storage_controller.log_contains(
|
||||
"Heartbeat round complete for 1 safekeepers, 0 offline"
|
||||
)
|
||||
|
||||
wait_until(storcon_heartbeat)
|
||||
|
||||
|
||||
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
compared = [dict(a), dict(b)]
|
||||
|
||||
Reference in New Issue
Block a user