Compare commits

...

23 Commits

Author SHA1 Message Date
Arpad Müller
c660d787e4 Fixes due to merge 2025-01-27 20:05:11 +01:00
Arpad Müller
ad56b9f76b Merge remote-tracking branch 'origin/main' into arpad/sk_timelines_schema 2025-01-27 19:57:34 +01:00
John Spray
aec92bfc34 pageserver: decrease utilization MAX_SHARDS (#10489)
## Problem

The intent of this parameter is to have pageservers consider themselves
"full" if they've got lots of shards, even if they have plenty of
capacity. It works, but because we typically successfully oversubscribe
capacity up to 200%, the MAX_SHARDS limit is effectively doubled, so
this 20,000 value ends up meaning 40,000, whereas the original intent
was to limit nodes to ~10000 shards.

## Summary of changes

- Change MAX_SHARDS to 5000, so that a node with 5000 will get a 100%
utilization, which is equivalent in practice to being considered "half
full" by the storage controller in capacity terms.

This is all a bit subtle and indiret. Originally the limit was baked
into the pageserver with the idea that the pageserver knows better what
its own resources tolerate than the storage controller does, but in
practice it would be probably be easier to understand all this if we
just did it controller-side. So there's scope to refactor here in
future.
2025-01-27 17:03:32 +00:00
Arpad Müller
b0b4b7dd8f storcon: switch to diesel-async and tokio-postgres (#10280)
Switches the storcon away from using diesel's synchronous APIs in favour
of `diesel-async`.

Advantages:

* less C dependencies, especially no openssl, which might be behind the
bug: https://github.com/neondatabase/cloud/issues/21010
* Better to only have async than mix of async plus `spawn_blocking`

We had to turn off usage of the connection pool for migrations, as
diesel migrations don't support async APIs. Thus we still use
`spawn_blocking` in that one place. But this is explicitly done in one
of the `diesel-async` examples.
2025-01-27 14:25:11 +00:00
Mikhail Kot
4dd4096f11 Pgbouncer exporter in compute image (#10503)
https://github.com/neondatabase/cloud/issues/19081
Include pgbouncer_exporter in compute image and run it at port 9127
2025-01-27 14:09:21 +00:00
Arpad Müller
1745fe5c65 Add SafekeeperGeneration 2025-01-27 12:57:44 +01:00
Erik Grinaker
be718ed121 pageserver: disable L0 flush stalls, tune delay threshold (#10507)
## Problem

In ingest benchmarks, we see L0 compaction delays of over 10 minutes due
to image compaction. We can't stall L0 flushes for that long.

## Summary of changes

Disable L0 flush stalls, and bump the default L0 flush delay threshold
from 20 to 30 L0 layers.
2025-01-25 16:51:54 +00:00
Arpad Müller
986db002cd Address simple review comments 2025-01-25 03:33:21 +01:00
Arpad Müller
7ec08ee805 deleted_at column instead of new_sk_set 2025-01-23 00:24:58 +01:00
Arpad Müller
e56aa822e1 clippy 2025-01-22 20:35:32 +01:00
Arpad Müller
f0777cf7ac Mark timelines for deletion during tenant deletion 2025-01-22 20:08:36 +01:00
Arpad Müller
a63153f4bc Optional get_timeline result 2025-01-22 20:08:36 +01:00
Arpad Müller
00380cedd7 Add support for timeline deletion 2025-01-22 20:08:36 +01:00
Arpad Müller
bf0d53aa2d Implement the creation part of the reconciler 2025-01-22 20:08:36 +01:00
Arpad Müller
ebe9ba0cdf Add status_kind 2025-01-22 20:08:36 +01:00
Arpad Müller
1ffe95c837 remove line 2025-01-22 20:08:36 +01:00
Arpad Müller
b5c29806f0 Draft for a reconciler 2025-01-22 20:08:36 +01:00
Arpad Müller
f0fe5fae6b persist 2025-01-22 20:08:36 +01:00
Arpad Müller
e805058364 Move to different function, clears up things a little 2025-01-22 20:08:36 +01:00
Arpad Müller
78c4a82331 wip 2025-01-22 20:08:36 +01:00
Arpad Müller
e35a726b32 wip 2025-01-22 20:08:34 +01:00
Arpad Müller
3d81af8975 wip 2025-01-22 20:07:10 +01:00
Arpad Müller
7d296b3cea Add schema for timelines table 2025-01-22 20:07:10 +01:00
23 changed files with 1737 additions and 421 deletions

161
Cargo.lock generated
View File

@@ -942,6 +942,18 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bb8"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
dependencies = [
"async-trait",
"futures-util",
"parking_lot 0.12.1",
"tokio",
]
[[package]]
name = "bcder"
version = "0.7.4"
@@ -1301,7 +1313,7 @@ dependencies = [
"tar",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-util",
"tower 0.5.2",
@@ -1410,7 +1422,7 @@ dependencies = [
"storage_broker",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-util",
"toml",
"toml_edit",
@@ -1786,11 +1798,24 @@ dependencies = [
"chrono",
"diesel_derives",
"itoa",
"pq-sys",
"r2d2",
"serde_json",
]
[[package]]
name = "diesel-async"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb"
dependencies = [
"async-trait",
"bb8",
"diesel",
"futures-util",
"scoped-futures",
"tokio",
"tokio-postgres 0.7.12",
]
[[package]]
name = "diesel_derives"
version = "2.2.1"
@@ -4042,8 +4067,8 @@ dependencies = [
"pageserver_compaction",
"pin-project-lite",
"postgres",
"postgres-protocol",
"postgres-types",
"postgres-protocol 0.6.4",
"postgres-types 0.2.4",
"postgres_backend",
"postgres_connection",
"postgres_ffi",
@@ -4074,7 +4099,7 @@ dependencies = [
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-tar",
"tokio-util",
@@ -4132,7 +4157,7 @@ dependencies = [
"serde",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-util",
"utils",
@@ -4438,7 +4463,7 @@ dependencies = [
"futures-util",
"log",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
]
[[package]]
@@ -4459,6 +4484,24 @@ dependencies = [
"stringprep",
]
[[package]]
name = "postgres-protocol"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23"
dependencies = [
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand 0.8.5",
"sha2",
"stringprep",
]
[[package]]
name = "postgres-protocol2"
version = "0.1.0"
@@ -4482,7 +4525,18 @@ source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
"postgres-protocol 0.6.4",
]
[[package]]
name = "postgres-types"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol 0.6.7",
]
[[package]]
@@ -4507,7 +4561,7 @@ dependencies = [
"serde",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-postgres-rustls",
"tokio-rustls 0.26.0",
"tokio-util",
@@ -4522,7 +4576,7 @@ dependencies = [
"itertools 0.10.5",
"once_cell",
"postgres",
"tokio-postgres",
"tokio-postgres 0.7.7",
"url",
]
@@ -4609,15 +4663,6 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pq-sys"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
dependencies = [
"vcpkg",
]
[[package]]
name = "pq_proto"
version = "0.1.0"
@@ -4625,7 +4670,7 @@ dependencies = [
"byteorder",
"bytes",
"itertools 0.10.5",
"postgres-protocol",
"postgres-protocol 0.6.4",
"rand 0.8.5",
"serde",
"thiserror",
@@ -4873,7 +4918,7 @@ dependencies = [
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-postgres2",
"tokio-rustls 0.26.0",
"tokio-tungstenite 0.21.0",
@@ -4930,17 +4975,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.7.3"
@@ -5672,7 +5706,7 @@ dependencies = [
"pageserver_api",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",
"postgres-protocol 0.6.4",
"postgres_backend",
"postgres_ffi",
"pprof",
@@ -5696,7 +5730,7 @@ dependencies = [
"tikv-jemallocator",
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-stream",
"tokio-tar",
"tokio-util",
@@ -5755,12 +5789,12 @@ dependencies = [
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
name = "scoped-futures"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091"
dependencies = [
"parking_lot 0.12.1",
"pin-project-lite",
]
[[package]]
@@ -6295,6 +6329,7 @@ dependencies = [
"clap",
"control_plane",
"diesel",
"diesel-async",
"diesel_migrations",
"fail",
"futures",
@@ -6309,10 +6344,12 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"r2d2",
"rand 0.8.5",
"reqwest",
"routerify",
"safekeeper_api",
"safekeeper_client",
"scoped-futures",
"scopeguard",
"serde",
"serde_json",
@@ -6365,7 +6402,7 @@ dependencies = [
"serde_json",
"storage_controller_client",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-postgres-rustls",
"tokio-stream",
"tokio-util",
@@ -6824,13 +6861,39 @@ dependencies = [
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"postgres-protocol 0.6.4",
"postgres-types 0.2.4",
"socket2",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-postgres"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot 0.12.1",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol 0.6.7",
"postgres-types 0.2.8",
"rand 0.8.5",
"socket2",
"tokio",
"tokio-util",
"whoami",
]
[[package]]
name = "tokio-postgres-rustls"
version = "0.12.0"
@@ -6840,7 +6903,7 @@ dependencies = [
"ring",
"rustls 0.23.18",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-rustls 0.26.0",
"x509-certificate",
]
@@ -7498,12 +7561,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@@ -7523,7 +7580,7 @@ dependencies = [
"serde_json",
"sysinfo",
"tokio",
"tokio-postgres",
"tokio-postgres 0.7.7",
"tokio-util",
"tracing",
"tracing-subscriber",

View File

@@ -64,8 +64,6 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS))
CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
# Force cargo not to print progress bar
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel)
CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib
CACHEDIR_TAG_CONTENTS := "Signature: 8a477f597d28d172789f06886806bc55"

View File

@@ -1266,11 +1266,12 @@ RUN set -e \
#########################################################################################
#
# Layers "postgres-exporter" and "sql-exporter"
# Layers "postgres-exporter", "pgbouncer-exporter", and "sql-exporter"
#
#########################################################################################
FROM quay.io/prometheuscommunity/postgres-exporter:v0.16.0 AS postgres-exporter
FROM quay.io/prometheuscommunity/pgbouncer-exporter:v0.10.2 AS pgbouncer-exporter
# Keep the version the same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py.
@@ -1402,6 +1403,7 @@ RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy
# Metrics exporter binaries and configuration files
COPY --from=postgres-exporter /bin/postgres_exporter /bin/postgres_exporter
COPY --from=pgbouncer-exporter /bin/pgbouncer_exporter /bin/pgbouncer_exporter
COPY --from=sql-exporter /bin/sql_exporter /bin/sql_exporter
COPY --chown=postgres compute/etc/postgres_exporter.yml /etc/postgres_exporter.yml

View File

@@ -19,6 +19,8 @@ max_prepared_statements=0
admin_users=postgres
unix_socket_dir=/tmp/
unix_socket_mode=0777
; required for pgbouncer_exporter
ignore_startup_parameters=extra_float_digits
;; Disable connection logging. It produces a lot of logs that no one looks at,
;; and we can get similar log entries from the proxy too. We had incidents in

View File

@@ -27,6 +27,10 @@ commands:
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn
shell: '/bin/pgbouncer_exporter --pgBouncer.connectionString="postgres:///pgbouncer?host=/tmp&port=6432&dbname=pgbouncer&user=pgbouncer"'
- name: sql-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -27,6 +27,10 @@ commands:
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
- name: pgbouncer-exporter
user: postgres
sysvInitAction: respawn
shell: '/bin/pgbouncer_exporter --pgBouncer.connectionString="postgres:///pgbouncer?host=/tmp&port=6432&dbname=pgbouncer&user=pgbouncer"'
- name: sql-exporter
user: nobody
sysvInitAction: respawn

View File

@@ -1104,6 +1104,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
safekeepers: None,
},
)
.await?;
@@ -1164,6 +1165,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
safekeepers: None,
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)
@@ -1222,6 +1224,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
ancestor_start_lsn: start_lsn,
pg_version: None,
},
safekeepers: None,
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)

View File

@@ -280,6 +280,18 @@ pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
#[serde(flatten)]
pub mode: TimelineCreateRequestMode,
/// Whether to also create timeline on the safekeepers (specific to storcon API)
pub safekeepers: Option<bool>,
}
/// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon {
#[serde(flatten)]
pub timeline_info: TimelineInfo,
pub safekeepers: Option<Vec<NodeId>>,
pub safekeepers_generation: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]

View File

@@ -19,7 +19,7 @@ pub struct SafekeeperStatus {
pub id: NodeId,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,

View File

@@ -144,6 +144,30 @@ impl Debug for Generation {
}
}
/// Like tenant generations, but for safekeepers
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct SafekeeperGeneration(u32);
impl SafekeeperGeneration {
pub const fn new(v: u32) -> Self {
Self(v)
}
#[track_caller]
pub fn previous(&self) -> Option<Self> {
Some(Self(self.0.checked_sub(1)?))
}
#[track_caller]
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
pub fn into_inner(self) -> u32 {
self.0
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -2168,8 +2168,8 @@ impl Timeline {
}
fn get_l0_flush_delay_threshold(&self) -> Option<usize> {
// Default to delay L0 flushes at 2x compaction threshold.
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 2;
// Default to delay L0 flushes at 3x compaction threshold.
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 3;
// If compaction is disabled, don't delay.
if self.get_compaction_period() == Duration::ZERO {
@@ -2197,8 +2197,10 @@ impl Timeline {
}
fn get_l0_flush_stall_threshold(&self) -> Option<usize> {
// Default to stall L0 flushes at 4x compaction threshold.
const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 4;
// Default to stall L0 flushes at 5x compaction threshold.
// TODO: stalls are temporarily disabled by default, see below.
#[allow(unused)]
const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 5;
// If compaction is disabled, don't stall.
if self.get_compaction_period() == Duration::ZERO {
@@ -2230,8 +2232,13 @@ impl Timeline {
return None;
}
let l0_flush_stall_threshold = l0_flush_stall_threshold
.unwrap_or(DEFAULT_L0_FLUSH_STALL_FACTOR * compaction_threshold);
// Disable stalls by default. In ingest benchmarks, we see image compaction take >10
// minutes, blocking L0 compaction, and we can't stall L0 flushes for that long.
//
// TODO: fix this.
// let l0_flush_stall_threshold = l0_flush_stall_threshold
// .unwrap_or(DEFAULT_L0_FLUSH_STALL_FACTOR * compaction_threshold);
let l0_flush_stall_threshold = l0_flush_stall_threshold?;
// 0 disables backpressure.
if l0_flush_stall_threshold == 0 {

View File

@@ -49,7 +49,7 @@ pub(crate) fn regenerate(
};
// Express a static value for how many shards we may schedule on one node
const MAX_SHARDS: u32 = 20000;
const MAX_SHARDS: u32 = 5000;
let mut doc = PageserverUtilization {
disk_usage_bytes: used,

View File

@@ -32,6 +32,8 @@ postgres_connection.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
@@ -45,12 +47,11 @@ strum_macros.workspace = true
diesel = { version = "2.2.6", features = [
"serde_json",
"postgres",
"r2d2",
"chrono",
] }
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] }
diesel_migrations = { version = "2.2.0" }
r2d2 = { version = "0.8.10" }
scoped-futures = "0.1.4"
utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }

View File

@@ -0,0 +1 @@
DROP TABLE timelines;

View File

@@ -0,0 +1,12 @@
CREATE TABLE timelines (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
PRIMARY KEY(tenant_id, timeline_id),
generation INTEGER NOT NULL,
sk_set BIGINT[] NOT NULL,
cplane_notified_generation INTEGER NOT NULL,
status_kind VARCHAR NOT NULL,
status VARCHAR NOT NULL,
deleted_at timestamptz
);
CREATE INDEX timelines_idx ON timelines(status_kind, deleted_at, tenant_id, timeline_id);

View File

@@ -17,6 +17,7 @@ mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod safekeeper_client;
mod scheduler;
mod schema;
pub mod service;

View File

@@ -10,6 +10,7 @@ use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::safekeeper_reconciler::SafekeeperReconciler;
use storage_controller::service::{
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
@@ -308,7 +309,7 @@ async fn async_main() -> anyhow::Result<()> {
// Validate that we can connect to the database
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
let persistence = Arc::new(Persistence::new(secrets.database_url));
let persistence = Arc::new(Persistence::new(secrets.database_url).await);
let service = Service::spawn(config, persistence.clone()).await?;
@@ -351,6 +352,24 @@ async fn async_main() -> anyhow::Result<()> {
)
});
const SAFEKEEPER_RECONCILER_INTERVAL: Duration = Duration::from_secs(120);
let safekeeper_reconciler_task = {
let service = service.clone();
let cancel = CancellationToken::new();
let cancel_bg = cancel.clone();
(
tokio::task::spawn(
async move {
let reconciler =
SafekeeperReconciler::new(service, SAFEKEEPER_RECONCILER_INTERVAL);
reconciler.run(cancel_bg).await
}
.instrument(tracing::info_span!("safekeeper_reconciler")),
),
cancel,
)
};
// Wait until we receive a signal
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
@@ -384,6 +403,11 @@ async fn async_main() -> anyhow::Result<()> {
chaos_cancel.cancel();
chaos_jh.await.ok();
}
// Do the same for the safekeeper reconciler
{
safekeeper_reconciler_task.1.cancel();
_ = safekeeper_reconciler_task.0.await;
}
service.shutdown().await;
tracing::info!("Service shutdown complete");

View File

@@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
@@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_passthrough_request_error:

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,94 @@
use crate::metrics::PageserverRequestLabelGroup;
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
///
/// Analogous to [`crate::pageserver_client::PageserverClient`].
#[derive(Debug, Clone)]
pub(crate) struct SafekeeperClient {
inner: Client,
node_id_label: String,
}
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_latency;
let _timer_guard = latency.start_timer(labels.clone());
let res = $invoke;
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}
res
}};
}
impl SafekeeperClient {
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
#[allow(unused)]
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) async fn create_timeline(
&self,
req: &TimelineCreateRequest,
) -> Result<TimelineStatus> {
measured_request!(
"create_timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.create_timeline(req).await
)
}
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineStatus> {
measured_request!(
"delete_timeline",
crate::metrics::Method::Delete,
&self.node_id_label,
self.inner.delete_timeline(tenant_id, timeline_id).await
)
}
}

View File

@@ -58,10 +58,24 @@ diesel::table! {
}
}
diesel::table! {
timelines (tenant_id, timeline_id) {
tenant_id -> Varchar,
timeline_id -> Varchar,
generation -> Int4,
sk_set -> Array<Nullable<Int8>>,
cplane_notified_generation -> Int4,
status_kind -> Varchar,
status -> Varchar,
deleted_at -> Nullable<Timestamptz>,
}
}
diesel::allow_tables_to_appear_in_same_query!(
controllers,
metadata_health,
nodes,
safekeepers,
tenant_shards,
timelines,
);

View File

@@ -1,7 +1,9 @@
pub mod chaos_injector;
mod context_iterator;
pub mod safekeeper_reconciler;
use hyper::Uri;
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use std::{
borrow::Cow,
cmp::Ordering,
@@ -26,21 +28,23 @@ use crate::{
peer_client::GlobalObservedState,
persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
ShardGenerationState, TenantFilter,
SafekeeperPersistence, ShardGenerationState, TenantFilter, TimelinePersistence,
TimelineStatusCreating, TimelineStatusKind,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper_client::SafekeeperClient,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
use anyhow::{anyhow, Context};
use control_plane::storage_controller::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};
use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use itertools::Itertools;
use pageserver_api::{
controller_api::{
@@ -54,7 +58,7 @@ use pageserver_api::{
},
models::{
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
TimelineArchivalConfigRequest, TopTenantShardsRequest,
TimelineArchivalConfigRequest, TimelineCreateResponseStorcon, TopTenantShardsRequest,
},
};
use reqwest::StatusCode;
@@ -75,14 +79,16 @@ use pageserver_api::{
},
};
use pageserver_client::{mgmt_api, BlockUnblock};
use tokio::sync::mpsc::error::TrySendError;
use tokio::{sync::mpsc::error::TrySendError, task::JoinSet};
use tokio_util::sync::CancellationToken;
use utils::{
backoff,
completion::Barrier,
failpoint_support,
generation::Generation,
generation::{Generation, SafekeeperGeneration},
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
pausable_failpoint,
sync::gate::Gate,
};
@@ -151,6 +157,7 @@ enum TenantOperations {
SecondaryDownload,
TimelineCreate,
TimelineDelete,
TimelineReconcile,
AttachHook,
TimelineArchivalConfig,
TimelineDetachAncestor,
@@ -3097,6 +3104,10 @@ impl Service {
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
self.persistence
.mark_timelines_for_deletion(tenant_id)
.await?;
// Detach all shards. This also deletes local pageserver shard data.
let (detach_waiters, node) = {
let mut detach_waiters = Vec::new();
@@ -3269,25 +3280,11 @@ impl Service {
Ok(())
}
pub(crate) async fn tenant_timeline_create(
async fn tenant_timeline_create_pageservers(
&self,
tenant_id: TenantId,
mut create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
tracing::info!(
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
);
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineCreate,
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
@@ -3404,8 +3401,261 @@ impl Service {
}
Ok(timeline_info)
}).await?
}
/// reconcile: create timeline on safekeepers
///
/// Assumes tenant lock is held while calling this function
async fn tenant_timeline_create_safekeepers_reconcile(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_persistence: &TimelinePersistence,
status_creating: &TimelineStatusCreating,
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
) -> Result<(), ApiError> {
// If quorum is reached, return if we are outside of a specified timeout
let jwt = self.config.jwt_token.clone().map(SecretString::from);
let mut joinset = JoinSet::new();
let mut members = Vec::new();
for sk in timeline_persistence.sk_set.iter() {
let Some(sk_p) = sk_persistences.get(sk) else {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't find persisted entry for safekeeper with id {sk}"
)))?;
};
members.push(SafekeeperId {
id: NodeId(sk_p.id as u64),
host: sk_p.host.clone(),
pg_port: sk_p.port as u16,
});
}
let mut mconf = safekeeper_api::membership::Configuration::empty();
mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf,
pg_version: status_creating.pg_version,
start_lsn: status_creating.start_lsn,
system_id: None,
tenant_id,
timeline_id,
wal_seg_size: None,
};
for sk in timeline_persistence.sk_set.iter() {
// Unwrap is fine as we already would have returned error above
let sk_p = sk_persistences.get(sk).unwrap();
let sk_clone = NodeId(*sk as u64);
let base_url = sk_p.base_url();
let jwt = jwt.clone();
let req = req.clone();
let cancel = self.cancel.clone();
joinset.spawn(async move {
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
let req = req;
let retry_result = backoff::retry(
|| client.create_timeline(&req),
|_e| {
// TODO find right criteria here for deciding on retries
false
},
3,
5,
"create timeline on safekeeper",
&cancel,
)
.await;
if let Some(res) = retry_result {
res.map_err(|e| {
ApiError::InternalServerError(
anyhow::Error::new(e).context("error creating timeline on safekeeper"),
)
})
} else {
Err(ApiError::Cancelled)
}
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
// Treat the first two tasks to finish differently, mostly when they timeout,
// because then we won't have a successful quorum.
// For the third task, we don't rely on it succeeding, and we need this to support
// continuing operations even if one safekeeper is down.
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
(
joinset.join_next().await.unwrap(),
joinset.join_next().await.unwrap(),
)
})
.await;
let mut reconcile_results = Vec::new();
match timeout_or_quorum {
Ok((Ok(res_1), Ok(res_2))) => {
reconcile_results.push(res_1);
reconcile_results.push(res_2);
}
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
return Err(ApiError::InternalServerError(anyhow!(
"task was cancelled while reconciling timeline creation"
)));
}
Err(_) => {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't reconcile timeline creation on safekeepers within timeout"
)));
}
}
let timeout_or_last =
tokio::time::timeout_at(reconcile_deadline, joinset.join_next().map(Option::unwrap))
.await;
if let Ok(Ok(res)) = timeout_or_last {
reconcile_results.push(res);
} else {
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
tracing::info!("timeout for third reconciliation");
}
// check now if quorum was reached in reconcile_results
let successful = reconcile_results
.into_iter()
.filter_map(|res| res.ok())
.collect::<Vec<_>>();
tracing::info!(
"Got {} successful results from reconciliation",
successful.len()
);
let status_kind = if successful.len() < 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {}",
successful.len()
)));
} else if successful.len() == 3 {
// Success, state of timeline is Created
TimelineStatusKind::Created
} else if successful.len() == 2 {
// Success, state of timeline remains Creating
TimelineStatusKind::Creating
} else {
unreachable!(
"unexpected number of successful reconciliations {}",
successful.len()
);
};
// notify cplane about creation
// TODO
self.persistence
.update_timeline_status(tenant_id, timeline_id, status_kind, "{}".to_owned())
.await?;
Ok(())
}
async fn tenant_timeline_create_safekeepers(
&self,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<(SafekeeperGeneration, Vec<NodeId>), ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version;
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
let status_creating = TimelineStatusCreating {
pg_version,
start_lsn,
};
let status = serde_json::to_string(&status_creating).unwrap();
// Add timeline to db
let timeline_persist = TimelinePersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: 0,
sk_set: sks_persistence.clone(),
cplane_notified_generation: 0,
status_kind: String::from(TimelineStatusKind::Creating),
status,
};
self.persistence
.insert_timeline(timeline_persist.clone())
.await?;
let sk_persistences = self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
self.tenant_timeline_create_safekeepers_reconcile(
tenant_id,
timeline_id,
&timeline_persist,
&status_creating,
&sk_persistences,
)
.await?;
Ok((SafekeeperGeneration::new(0), sks))
}
pub(crate) async fn tenant_timeline_create(
&self,
tenant_id: TenantId,
create_req: TimelineCreateRequest,
) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = create_req.safekeepers.unwrap_or_default();
tracing::info!(
%safekeepers,
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
);
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineCreate,
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let create_mode = create_req.mode.clone();
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
let (safekeepers_generation, safekeepers) = if safekeepers {
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
.await?;
(Some(res.0.into_inner()), Some(res.1))
} else {
(None, None)
};
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers_generation,
safekeepers,
})
.await?
}
pub(crate) async fn tenant_timeline_archival_config(
@@ -3870,6 +4120,187 @@ impl Service {
Ok(result)
}
async fn tenant_timeline_delete_safekeepers_reconcile(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
tl_p: &TimelinePersistence,
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
) -> Result<(), ApiError> {
// If at least one deletion succeeded, return if we are outside of a specified timeout
let jwt = self.config.jwt_token.clone().map(SecretString::from);
let mut joinset = JoinSet::new();
let mut members = Vec::new();
for sk in tl_p.sk_set.iter() {
let Some(sk_p) = sk_persistences.get(sk) else {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't find persisted entry for safekeeper with id {sk}"
)))?;
};
members.push(SafekeeperId {
id: NodeId(sk_p.id as u64),
host: sk_p.host.clone(),
pg_port: sk_p.port as u16,
});
}
let sks_to_reconcile = &tl_p.sk_set;
for sk in sks_to_reconcile.iter() {
// Unwrap is fine as we already would have returned error above
let sk_p = sk_persistences.get(sk).unwrap();
let sk_clone = NodeId(*sk as u64);
let base_url = sk_p.base_url();
let jwt = jwt.clone();
let cancel = self.cancel.clone();
joinset.spawn(async move {
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
let retry_result = backoff::retry(
|| client.delete_timeline(tenant_id, timeline_id),
|_e| {
// TODO find right criteria here for deciding on retries
false
},
3,
5,
"delete timeline on safekeeper",
&cancel,
)
.await;
if let Some(res) = retry_result {
res.map_err(|e| {
ApiError::InternalServerError(
anyhow::Error::new(e).context("error deleting timeline on safekeeper"),
)
})
} else {
Err(ApiError::Cancelled)
}
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
const SK_DELETE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let reconcile_deadline = tokio::time::Instant::now() + SK_DELETE_TIMELINE_RECONCILE_TIMEOUT;
// Treat the first task to finish differently, mostly when it times out,
// because then we won't have any successful deletion.
// For the second and third task, we don't rely on them succeeding, and we need this to support
// continuing operations even if a safekeeper is down.
let timeout_or_first = tokio::time::timeout_at(reconcile_deadline, async {
joinset.join_next().await.unwrap()
})
.await;
let mut reconcile_results = Vec::new();
match timeout_or_first {
Ok(Ok(res_1)) => {
reconcile_results.push(res_1);
}
Ok(Err(_)) => {
return Err(ApiError::InternalServerError(anyhow!(
"task was cancelled while reconciling timeline deletion"
)));
}
Err(_) => {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't reconcile timeline deletion on safekeepers within timeout"
)));
}
}
let timeout_or_last = tokio::time::timeout_at(reconcile_deadline, async {
while let Some(next_res) = joinset.join_next().await {
match next_res {
Ok(res) => {
reconcile_results.push(res);
}
Err(e) => {
tracing::info!("aborting reconciliation due to join error: {e:?}");
break;
}
}
}
});
if let Err(e) = timeout_or_last.await {
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
tracing::info!(
"timeout for last {} reconciliations: {e}",
sks_to_reconcile.len() - 1
);
}
// check now if quorum was reached in reconcile_results
let successful = reconcile_results
.into_iter()
.filter_map(|res| res.ok())
.collect::<Vec<_>>();
tracing::info!(
"Got {} successful results from reconciliation",
successful.len()
);
let new_status_kind = if successful.is_empty() {
// Failure
return Err(ApiError::InternalServerError(anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {}",
successful.len()
)));
} else if successful.len() == sks_to_reconcile.len() {
// Success, state of timeline is Deleted
TimelineStatusKind::Deleted
} else if successful.len() == 2 {
// Success, state of timeline remains Creating
TimelineStatusKind::Deleting
} else {
unreachable!(
"unexpected number of successful reconciliations {}",
successful.len()
);
};
if new_status_kind == TimelineStatusKind::Deleted {
self.persistence
.update_timeline_status_deleted(tenant_id, timeline_id)
.await?;
}
Ok(())
}
async fn tenant_timeline_delete_safekeepers(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), ApiError> {
let tl = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(tl) = tl else {
tracing::info!("timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed");
return Ok(());
};
let status_kind =
TimelineStatusKind::from_str(&tl.status_kind).map_err(ApiError::InternalServerError)?;
if status_kind != TimelineStatusKind::Deleting {
// Set status to deleting
let new_status_kind = TimelineStatusKind::Deleting;
self.persistence
.update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned())
.await?;
}
let sk_persistences = self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
self.tenant_timeline_delete_safekeepers_reconcile(
tenant_id,
timeline_id,
&tl,
&sk_persistences,
)
.await?;
Ok(())
}
pub(crate) async fn tenant_timeline_delete(
&self,
tenant_id: TenantId,
@@ -3883,7 +4314,7 @@ impl Service {
)
.await;
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
let ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
@@ -3955,7 +4386,13 @@ impl Service {
)
.await?;
Ok(shard_zero_status)
}).await?
});
let sk_fut = self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id);
let (ps_res, sk_res) = tokio::join!(ps_fut, sk_fut);
sk_res?;
ps_res?
}
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
@@ -7653,6 +8090,32 @@ impl Service {
global_observed
}
pub(crate) async fn safekeepers_for_new_timeline(&self) -> Result<Vec<NodeId>, ApiError> {
let mut all_safekeepers = self
.persistence
.list_safekeepers_with_timeline_count()
.await?;
all_safekeepers.sort_by_key(|sk| sk.2);
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (sk_id, az_id, _timeline_count) in all_safekeepers.iter() {
if !azs.insert(az_id) {
continue;
}
sks.push(*sk_id);
if sks.len() == 3 {
break;
}
}
if sks.len() == 3 {
Ok(sks)
} else {
Err(ApiError::InternalServerError(anyhow!(
"couldn't find three safekeepers in different AZs for new timeline"
)))
}
}
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {

View File

@@ -0,0 +1,130 @@
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
failpoint_support,
id::{TenantId, TimelineId},
};
use crate::{
id_lock_map::trace_shared_lock,
persistence::SafekeeperPersistence,
service::{TenantOperations, TimelineStatusCreating, TimelineStatusKind},
};
use super::{Service, TimelinePersistence};
pub struct SafekeeperReconciler {
service: Arc<Service>,
duration: Duration,
}
impl SafekeeperReconciler {
pub fn new(service: Arc<Service>, duration: Duration) -> Self {
SafekeeperReconciler { service, duration }
}
pub async fn run(&self, cancel: CancellationToken) {
while !cancel.is_cancelled() {
tokio::select! {
_ = tokio::time::sleep(self.duration) => (),
_ = cancel.cancelled() => break,
}
match self.reconcile_iteration(&cancel).await {
Ok(()) => (),
Err(e) => {
tracing::warn!("Error during safekeeper reconciliation: {e:?}");
}
}
}
}
async fn reconcile_iteration(&self, cancel: &CancellationToken) -> Result<(), anyhow::Error> {
let work_list = self
.service
.persistence
.timelines_to_be_reconciled()
.await?;
if work_list.is_empty() {
return Ok(());
}
let sk_persistences = self
.service
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
for tl in work_list {
let reconcile_fut =
self.reconcile_timeline(&tl, &sk_persistences)
.instrument(tracing::info_span!(
"safekeeper_reconcile_timeline",
timeline_id = tl.timeline_id,
tenant_id = tl.tenant_id
));
tokio::select! {
r = reconcile_fut => r?,
_ = cancel.cancelled() => break,
}
}
Ok(())
}
async fn reconcile_timeline(
&self,
tl: &TimelinePersistence,
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
) -> Result<(), anyhow::Error> {
tracing::info!("Reconciling timeline on safekeepers");
let tenant_id = TenantId::from_slice(tl.tenant_id.as_bytes())?;
let timeline_id = TimelineId::from_slice(tl.timeline_id.as_bytes())?;
let _tenant_lock = trace_shared_lock(
&self.service.tenant_op_locks,
tenant_id,
TenantOperations::TimelineReconcile,
)
.await;
failpoint_support::sleep_millis_async!("safekeeper-reconcile-timeline-shared-lock");
// Load the timeline again from the db: unless we hold the tenant lock, the timeline can change under our noses.
let tl = self
.service
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(tl) = tl else {
// This can happen but is a bit unlikely, so print it on the warn level instead of info
tracing::warn!("timeline row in database disappeared");
return Ok(());
};
let status = TimelineStatusKind::from_str(&tl.status)?;
match status {
TimelineStatusKind::Created | TimelineStatusKind::Deleted => (),
TimelineStatusKind::Creating => {
let status_creating: TimelineStatusCreating = serde_json::from_str(&tl.status)?;
self.service
.tenant_timeline_create_safekeepers_reconcile(
tenant_id,
timeline_id,
&tl,
&status_creating,
sk_persistences,
)
.await?;
}
TimelineStatusKind::Deleting => {
self.service
.tenant_timeline_delete_safekeepers_reconcile(
tenant_id,
timeline_id,
&tl,
sk_persistences,
)
.await?;
}
}
Ok(())
}
}