mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Compare commits
23 Commits
skyzh/comp
...
arpad/sk_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c660d787e4 | ||
|
|
ad56b9f76b | ||
|
|
aec92bfc34 | ||
|
|
b0b4b7dd8f | ||
|
|
4dd4096f11 | ||
|
|
1745fe5c65 | ||
|
|
be718ed121 | ||
|
|
986db002cd | ||
|
|
7ec08ee805 | ||
|
|
e56aa822e1 | ||
|
|
f0777cf7ac | ||
|
|
a63153f4bc | ||
|
|
00380cedd7 | ||
|
|
bf0d53aa2d | ||
|
|
ebe9ba0cdf | ||
|
|
1ffe95c837 | ||
|
|
b5c29806f0 | ||
|
|
f0fe5fae6b | ||
|
|
e805058364 | ||
|
|
78c4a82331 | ||
|
|
e35a726b32 | ||
|
|
3d81af8975 | ||
|
|
7d296b3cea |
161
Cargo.lock
generated
161
Cargo.lock
generated
@@ -942,6 +942,18 @@ version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
|
||||
|
||||
[[package]]
|
||||
name = "bb8"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"futures-util",
|
||||
"parking_lot 0.12.1",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bcder"
|
||||
version = "0.7.4"
|
||||
@@ -1301,7 +1313,7 @@ dependencies = [
|
||||
"tar",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower 0.5.2",
|
||||
@@ -1410,7 +1422,7 @@ dependencies = [
|
||||
"storage_broker",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"toml_edit",
|
||||
@@ -1786,11 +1798,24 @@ dependencies = [
|
||||
"chrono",
|
||||
"diesel_derives",
|
||||
"itoa",
|
||||
"pq-sys",
|
||||
"r2d2",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "diesel-async"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bb8",
|
||||
"diesel",
|
||||
"futures-util",
|
||||
"scoped-futures",
|
||||
"tokio",
|
||||
"tokio-postgres 0.7.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "diesel_derives"
|
||||
version = "2.2.1"
|
||||
@@ -4042,8 +4067,8 @@ dependencies = [
|
||||
"pageserver_compaction",
|
||||
"pin-project-lite",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
"postgres-protocol 0.6.4",
|
||||
"postgres-types 0.2.4",
|
||||
"postgres_backend",
|
||||
"postgres_connection",
|
||||
"postgres_ffi",
|
||||
@@ -4074,7 +4099,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
@@ -4132,7 +4157,7 @@ dependencies = [
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"utils",
|
||||
@@ -4438,7 +4463,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4459,6 +4484,24 @@ dependencies = [
|
||||
"stringprep",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"hmac",
|
||||
"md-5",
|
||||
"memchr",
|
||||
"rand 0.8.5",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-protocol2"
|
||||
version = "0.1.0"
|
||||
@@ -4482,7 +4525,18 @@ source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol",
|
||||
"postgres-protocol 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"postgres-protocol 0.6.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4507,7 +4561,7 @@ dependencies = [
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
@@ -4522,7 +4576,7 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"once_cell",
|
||||
"postgres",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -4609,15 +4663,6 @@ version = "0.2.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "pq-sys"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
|
||||
dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pq_proto"
|
||||
version = "0.1.0"
|
||||
@@ -4625,7 +4670,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"itertools 0.10.5",
|
||||
"postgres-protocol",
|
||||
"postgres-protocol 0.6.4",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -4873,7 +4918,7 @@ dependencies = [
|
||||
"tikv-jemalloc-ctl",
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-postgres2",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-tungstenite 0.21.0",
|
||||
@@ -4930,17 +4975,6 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r2d2"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||
dependencies = [
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"scheduled-thread-pool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
@@ -5672,7 +5706,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-protocol 0.6.4",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"pprof",
|
||||
@@ -5696,7 +5730,7 @@ dependencies = [
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
@@ -5755,12 +5789,12 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scheduled-thread-pool"
|
||||
version = "0.2.7"
|
||||
name = "scoped-futures"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
|
||||
checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6295,6 +6329,7 @@ dependencies = [
|
||||
"clap",
|
||||
"control_plane",
|
||||
"diesel",
|
||||
"diesel-async",
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
@@ -6309,10 +6344,12 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"postgres_connection",
|
||||
"r2d2",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"routerify",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scoped-futures",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -6365,7 +6402,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"storage_controller_client",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -6824,13 +6861,39 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
"postgres-protocol 0.6.4",
|
||||
"postgres-types 0.2.4",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol 0.6.7",
|
||||
"postgres-types 0.2.8",
|
||||
"rand 0.8.5",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"whoami",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres-rustls"
|
||||
version = "0.12.0"
|
||||
@@ -6840,7 +6903,7 @@ dependencies = [
|
||||
"ring",
|
||||
"rustls 0.23.18",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-rustls 0.26.0",
|
||||
"x509-certificate",
|
||||
]
|
||||
@@ -7498,12 +7561,6 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.4"
|
||||
@@ -7523,7 +7580,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"sysinfo",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres 0.7.7",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
|
||||
2
Makefile
2
Makefile
@@ -64,8 +64,6 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS))
|
||||
CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
|
||||
# Force cargo not to print progress bar
|
||||
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
|
||||
# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel)
|
||||
CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib
|
||||
|
||||
CACHEDIR_TAG_CONTENTS := "Signature: 8a477f597d28d172789f06886806bc55"
|
||||
|
||||
|
||||
@@ -1266,11 +1266,12 @@ RUN set -e \
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layers "postgres-exporter" and "sql-exporter"
|
||||
# Layers "postgres-exporter", "pgbouncer-exporter", and "sql-exporter"
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM quay.io/prometheuscommunity/postgres-exporter:v0.16.0 AS postgres-exporter
|
||||
FROM quay.io/prometheuscommunity/pgbouncer-exporter:v0.10.2 AS pgbouncer-exporter
|
||||
|
||||
# Keep the version the same as in build-tools.Dockerfile and
|
||||
# test_runner/regress/test_compute_metrics.py.
|
||||
@@ -1402,6 +1403,7 @@ RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy
|
||||
|
||||
# Metrics exporter binaries and configuration files
|
||||
COPY --from=postgres-exporter /bin/postgres_exporter /bin/postgres_exporter
|
||||
COPY --from=pgbouncer-exporter /bin/pgbouncer_exporter /bin/pgbouncer_exporter
|
||||
COPY --from=sql-exporter /bin/sql_exporter /bin/sql_exporter
|
||||
|
||||
COPY --chown=postgres compute/etc/postgres_exporter.yml /etc/postgres_exporter.yml
|
||||
|
||||
@@ -19,6 +19,8 @@ max_prepared_statements=0
|
||||
admin_users=postgres
|
||||
unix_socket_dir=/tmp/
|
||||
unix_socket_mode=0777
|
||||
; required for pgbouncer_exporter
|
||||
ignore_startup_parameters=extra_float_digits
|
||||
|
||||
;; Disable connection logging. It produces a lot of logs that no one looks at,
|
||||
;; and we can get similar log entries from the proxy too. We had incidents in
|
||||
|
||||
@@ -27,6 +27,10 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
|
||||
- name: pgbouncer-exporter
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/pgbouncer_exporter --pgBouncer.connectionString="postgres:///pgbouncer?host=/tmp&port=6432&dbname=pgbouncer&user=pgbouncer"'
|
||||
- name: sql-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
|
||||
@@ -27,6 +27,10 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter --config.file=/etc/postgres_exporter.yml'
|
||||
- name: pgbouncer-exporter
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/pgbouncer_exporter --pgBouncer.connectionString="postgres:///pgbouncer?host=/tmp&port=6432&dbname=pgbouncer&user=pgbouncer"'
|
||||
- name: sql-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
|
||||
@@ -1104,6 +1104,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
},
|
||||
safekeepers: None,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
@@ -1164,6 +1165,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
},
|
||||
safekeepers: None,
|
||||
};
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
@@ -1222,6 +1224,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
ancestor_start_lsn: start_lsn,
|
||||
pg_version: None,
|
||||
},
|
||||
safekeepers: None,
|
||||
};
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
|
||||
@@ -280,6 +280,18 @@ pub struct TimelineCreateRequest {
|
||||
pub new_timeline_id: TimelineId,
|
||||
#[serde(flatten)]
|
||||
pub mode: TimelineCreateRequestMode,
|
||||
/// Whether to also create timeline on the safekeepers (specific to storcon API)
|
||||
pub safekeepers: Option<bool>,
|
||||
}
|
||||
|
||||
/// Storage controller specific extensions to [`TimelineInfo`].
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateResponseStorcon {
|
||||
#[serde(flatten)]
|
||||
pub timeline_info: TimelineInfo,
|
||||
|
||||
pub safekeepers: Option<Vec<NodeId>>,
|
||||
pub safekeepers_generation: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
|
||||
@@ -19,7 +19,7 @@ pub struct SafekeeperStatus {
|
||||
pub id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
|
||||
@@ -144,6 +144,30 @@ impl Debug for Generation {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like tenant generations, but for safekeepers
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub struct SafekeeperGeneration(u32);
|
||||
|
||||
impl SafekeeperGeneration {
|
||||
pub const fn new(v: u32) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn previous(&self) -> Option<Self> {
|
||||
Some(Self(self.0.checked_sub(1)?))
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
@@ -2168,8 +2168,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fn get_l0_flush_delay_threshold(&self) -> Option<usize> {
|
||||
// Default to delay L0 flushes at 2x compaction threshold.
|
||||
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 2;
|
||||
// Default to delay L0 flushes at 3x compaction threshold.
|
||||
const DEFAULT_L0_FLUSH_DELAY_FACTOR: usize = 3;
|
||||
|
||||
// If compaction is disabled, don't delay.
|
||||
if self.get_compaction_period() == Duration::ZERO {
|
||||
@@ -2197,8 +2197,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fn get_l0_flush_stall_threshold(&self) -> Option<usize> {
|
||||
// Default to stall L0 flushes at 4x compaction threshold.
|
||||
const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 4;
|
||||
// Default to stall L0 flushes at 5x compaction threshold.
|
||||
// TODO: stalls are temporarily disabled by default, see below.
|
||||
#[allow(unused)]
|
||||
const DEFAULT_L0_FLUSH_STALL_FACTOR: usize = 5;
|
||||
|
||||
// If compaction is disabled, don't stall.
|
||||
if self.get_compaction_period() == Duration::ZERO {
|
||||
@@ -2230,8 +2232,13 @@ impl Timeline {
|
||||
return None;
|
||||
}
|
||||
|
||||
let l0_flush_stall_threshold = l0_flush_stall_threshold
|
||||
.unwrap_or(DEFAULT_L0_FLUSH_STALL_FACTOR * compaction_threshold);
|
||||
// Disable stalls by default. In ingest benchmarks, we see image compaction take >10
|
||||
// minutes, blocking L0 compaction, and we can't stall L0 flushes for that long.
|
||||
//
|
||||
// TODO: fix this.
|
||||
// let l0_flush_stall_threshold = l0_flush_stall_threshold
|
||||
// .unwrap_or(DEFAULT_L0_FLUSH_STALL_FACTOR * compaction_threshold);
|
||||
let l0_flush_stall_threshold = l0_flush_stall_threshold?;
|
||||
|
||||
// 0 disables backpressure.
|
||||
if l0_flush_stall_threshold == 0 {
|
||||
|
||||
@@ -49,7 +49,7 @@ pub(crate) fn regenerate(
|
||||
};
|
||||
|
||||
// Express a static value for how many shards we may schedule on one node
|
||||
const MAX_SHARDS: u32 = 20000;
|
||||
const MAX_SHARDS: u32 = 5000;
|
||||
|
||||
let mut doc = PageserverUtilization {
|
||||
disk_usage_bytes: used,
|
||||
|
||||
@@ -32,6 +32,8 @@ postgres_connection.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
thiserror.workspace = true
|
||||
@@ -45,12 +47,11 @@ strum_macros.workspace = true
|
||||
|
||||
diesel = { version = "2.2.6", features = [
|
||||
"serde_json",
|
||||
"postgres",
|
||||
"r2d2",
|
||||
"chrono",
|
||||
] }
|
||||
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] }
|
||||
diesel_migrations = { version = "2.2.0" }
|
||||
r2d2 = { version = "0.8.10" }
|
||||
scoped-futures = "0.1.4"
|
||||
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE timelines;
|
||||
@@ -0,0 +1,12 @@
|
||||
CREATE TABLE timelines (
|
||||
tenant_id VARCHAR NOT NULL,
|
||||
timeline_id VARCHAR NOT NULL,
|
||||
PRIMARY KEY(tenant_id, timeline_id),
|
||||
generation INTEGER NOT NULL,
|
||||
sk_set BIGINT[] NOT NULL,
|
||||
cplane_notified_generation INTEGER NOT NULL,
|
||||
status_kind VARCHAR NOT NULL,
|
||||
status VARCHAR NOT NULL,
|
||||
deleted_at timestamptz
|
||||
);
|
||||
CREATE INDEX timelines_idx ON timelines(status_kind, deleted_at, tenant_id, timeline_id);
|
||||
@@ -17,6 +17,7 @@ mod pageserver_client;
|
||||
mod peer_client;
|
||||
pub mod persistence;
|
||||
mod reconciler;
|
||||
mod safekeeper_client;
|
||||
mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
|
||||
@@ -10,6 +10,7 @@ use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::safekeeper_reconciler::SafekeeperReconciler;
|
||||
use storage_controller::service::{
|
||||
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
|
||||
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
|
||||
@@ -308,7 +309,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
// Validate that we can connect to the database
|
||||
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
|
||||
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url));
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url).await);
|
||||
|
||||
let service = Service::spawn(config, persistence.clone()).await?;
|
||||
|
||||
@@ -351,6 +352,24 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
)
|
||||
});
|
||||
|
||||
const SAFEKEEPER_RECONCILER_INTERVAL: Duration = Duration::from_secs(120);
|
||||
let safekeeper_reconciler_task = {
|
||||
let service = service.clone();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_bg = cancel.clone();
|
||||
(
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let reconciler =
|
||||
SafekeeperReconciler::new(service, SAFEKEEPER_RECONCILER_INTERVAL);
|
||||
reconciler.run(cancel_bg).await
|
||||
}
|
||||
.instrument(tracing::info_span!("safekeeper_reconciler")),
|
||||
),
|
||||
cancel,
|
||||
)
|
||||
};
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
|
||||
@@ -384,6 +403,11 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
chaos_cancel.cancel();
|
||||
chaos_jh.await.ok();
|
||||
}
|
||||
// Do the same for the safekeeper reconciler
|
||||
{
|
||||
safekeeper_reconciler_task.1.cancel();
|
||||
_ = safekeeper_reconciler_task.0.await;
|
||||
}
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
@@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Count of HTTP requests to the safekeeper that resulted in an error,
|
||||
/// broken down by the safekeeper node id, request name and method
|
||||
pub(crate) storage_controller_safekeeper_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Latency of HTTP requests to the pageserver, broken down by pageserver
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
@@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
|
||||
pub(crate) storage_controller_safekeeper_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
|
||||
/// broken down by the pageserver node id, request name and method
|
||||
pub(crate) storage_controller_passthrough_request_error:
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
94
storage_controller/src/safekeeper_client.rs
Normal file
94
storage_controller/src/safekeeper_client.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use crate::metrics::PageserverRequestLabelGroup;
|
||||
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_client::mgmt_api::{Client, Result};
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
///
|
||||
/// Analogous to [`crate::pageserver_client::PageserverClient`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SafekeeperClient {
|
||||
inner: Client,
|
||||
node_id_label: String,
|
||||
}
|
||||
|
||||
macro_rules! measured_request {
|
||||
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
|
||||
let labels = PageserverRequestLabelGroup {
|
||||
pageserver_id: $node_id,
|
||||
path: $name,
|
||||
method: $method,
|
||||
};
|
||||
|
||||
let latency = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safekeeper_request_latency;
|
||||
let _timer_guard = latency.start_timer(labels.clone());
|
||||
|
||||
let res = $invoke;
|
||||
|
||||
if res.is_err() {
|
||||
let error_counters = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_pageserver_request_error;
|
||||
error_counters.inc(labels)
|
||||
}
|
||||
|
||||
res
|
||||
}};
|
||||
}
|
||||
|
||||
impl SafekeeperClient {
|
||||
pub(crate) fn new(
|
||||
node_id: NodeId,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn from_client(
|
||||
node_id: NodeId,
|
||||
raw_client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
req: &TimelineCreateRequest,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"create_timeline",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.create_timeline(req).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"delete_timeline",
|
||||
crate::metrics::Method::Delete,
|
||||
&self.node_id_label,
|
||||
self.inner.delete_timeline(tenant_id, timeline_id).await
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -58,10 +58,24 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
timelines (tenant_id, timeline_id) {
|
||||
tenant_id -> Varchar,
|
||||
timeline_id -> Varchar,
|
||||
generation -> Int4,
|
||||
sk_set -> Array<Nullable<Int8>>,
|
||||
cplane_notified_generation -> Int4,
|
||||
status_kind -> Varchar,
|
||||
status -> Varchar,
|
||||
deleted_at -> Nullable<Timestamptz>,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(
|
||||
controllers,
|
||||
metadata_health,
|
||||
nodes,
|
||||
safekeepers,
|
||||
tenant_shards,
|
||||
timelines,
|
||||
);
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
pub mod safekeeper_reconciler;
|
||||
|
||||
use hyper::Uri;
|
||||
use safekeeper_api::membership::{MemberSet, SafekeeperId};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
@@ -26,21 +28,23 @@ use crate::{
|
||||
peer_client::GlobalObservedState,
|
||||
persistence::{
|
||||
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
|
||||
ShardGenerationState, TenantFilter,
|
||||
SafekeeperPersistence, ShardGenerationState, TenantFilter, TimelinePersistence,
|
||||
TimelineStatusCreating, TimelineStatusKind,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
safekeeper_client::SafekeeperClient,
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
|
||||
ScheduleOptimization, ScheduleOptimizationAction,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
use anyhow::{anyhow, Context};
|
||||
use control_plane::storage_controller::{
|
||||
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
|
||||
};
|
||||
use diesel::result::DatabaseErrorKind;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
@@ -54,7 +58,7 @@ use pageserver_api::{
|
||||
},
|
||||
models::{
|
||||
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
TimelineArchivalConfigRequest, TopTenantShardsRequest,
|
||||
TimelineArchivalConfigRequest, TimelineCreateResponseStorcon, TopTenantShardsRequest,
|
||||
},
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
@@ -75,14 +79,16 @@ use pageserver_api::{
|
||||
},
|
||||
};
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::{sync::mpsc::error::TrySendError, task::JoinSet};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
backoff,
|
||||
completion::Barrier,
|
||||
failpoint_support,
|
||||
generation::Generation,
|
||||
generation::{Generation, SafekeeperGeneration},
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
pausable_failpoint,
|
||||
sync::gate::Gate,
|
||||
};
|
||||
@@ -151,6 +157,7 @@ enum TenantOperations {
|
||||
SecondaryDownload,
|
||||
TimelineCreate,
|
||||
TimelineDelete,
|
||||
TimelineReconcile,
|
||||
AttachHook,
|
||||
TimelineArchivalConfig,
|
||||
TimelineDetachAncestor,
|
||||
@@ -3097,6 +3104,10 @@ impl Service {
|
||||
|
||||
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
|
||||
|
||||
self.persistence
|
||||
.mark_timelines_for_deletion(tenant_id)
|
||||
.await?;
|
||||
|
||||
// Detach all shards. This also deletes local pageserver shard data.
|
||||
let (detach_waiters, node) = {
|
||||
let mut detach_waiters = Vec::new();
|
||||
@@ -3269,25 +3280,11 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create(
|
||||
async fn tenant_timeline_create_pageservers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
mut create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
tracing::info!(
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineCreate,
|
||||
)
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
@@ -3404,8 +3401,261 @@ impl Service {
|
||||
}
|
||||
|
||||
Ok(timeline_info)
|
||||
}).await?
|
||||
}
|
||||
|
||||
/// reconcile: create timeline on safekeepers
|
||||
///
|
||||
/// Assumes tenant lock is held while calling this function
|
||||
async fn tenant_timeline_create_safekeepers_reconcile(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_persistence: &TimelinePersistence,
|
||||
status_creating: &TimelineStatusCreating,
|
||||
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
|
||||
) -> Result<(), ApiError> {
|
||||
// If quorum is reached, return if we are outside of a specified timeout
|
||||
let jwt = self.config.jwt_token.clone().map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk in timeline_persistence.sk_set.iter() {
|
||||
let Some(sk_p) = sk_persistences.get(sk) else {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find persisted entry for safekeeper with id {sk}"
|
||||
)))?;
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: NodeId(sk_p.id as u64),
|
||||
host: sk_p.host.clone(),
|
||||
pg_port: sk_p.port as u16,
|
||||
});
|
||||
}
|
||||
let mut mconf = safekeeper_api::membership::Configuration::empty();
|
||||
mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let req = safekeeper_api::models::TimelineCreateRequest {
|
||||
commit_lsn: None,
|
||||
mconf,
|
||||
pg_version: status_creating.pg_version,
|
||||
start_lsn: status_creating.start_lsn,
|
||||
system_id: None,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
wal_seg_size: None,
|
||||
};
|
||||
for sk in timeline_persistence.sk_set.iter() {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = sk_persistences.get(sk).unwrap();
|
||||
let sk_clone = NodeId(*sk as u64);
|
||||
let base_url = sk_p.base_url();
|
||||
let jwt = jwt.clone();
|
||||
let req = req.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
joinset.spawn(async move {
|
||||
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
|
||||
let req = req;
|
||||
let retry_result = backoff::retry(
|
||||
|| client.create_timeline(&req),
|
||||
|_e| {
|
||||
// TODO find right criteria here for deciding on retries
|
||||
false
|
||||
},
|
||||
3,
|
||||
5,
|
||||
"create timeline on safekeeper",
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
if let Some(res) = retry_result {
|
||||
res.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
anyhow::Error::new(e).context("error creating timeline on safekeeper"),
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Err(ApiError::Cancelled)
|
||||
}
|
||||
});
|
||||
}
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
|
||||
// Treat the first two tasks to finish differently, mostly when they timeout,
|
||||
// because then we won't have a successful quorum.
|
||||
// For the third task, we don't rely on it succeeding, and we need this to support
|
||||
// continuing operations even if one safekeeper is down.
|
||||
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
(
|
||||
joinset.join_next().await.unwrap(),
|
||||
joinset.join_next().await.unwrap(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
let mut reconcile_results = Vec::new();
|
||||
match timeout_or_quorum {
|
||||
Ok((Ok(res_1), Ok(res_2))) => {
|
||||
reconcile_results.push(res_1);
|
||||
reconcile_results.push(res_2);
|
||||
}
|
||||
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"task was cancelled while reconciling timeline creation"
|
||||
)));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't reconcile timeline creation on safekeepers within timeout"
|
||||
)));
|
||||
}
|
||||
}
|
||||
let timeout_or_last =
|
||||
tokio::time::timeout_at(reconcile_deadline, joinset.join_next().map(Option::unwrap))
|
||||
.await;
|
||||
if let Ok(Ok(res)) = timeout_or_last {
|
||||
reconcile_results.push(res);
|
||||
} else {
|
||||
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
|
||||
tracing::info!("timeout for third reconciliation");
|
||||
}
|
||||
// check now if quorum was reached in reconcile_results
|
||||
let successful = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.ok())
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} successful results from reconciliation",
|
||||
successful.len()
|
||||
);
|
||||
let status_kind = if successful.len() < 2 {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"not enough successful reconciliations to reach quorum, please retry: {}",
|
||||
successful.len()
|
||||
)));
|
||||
} else if successful.len() == 3 {
|
||||
// Success, state of timeline is Created
|
||||
TimelineStatusKind::Created
|
||||
} else if successful.len() == 2 {
|
||||
// Success, state of timeline remains Creating
|
||||
TimelineStatusKind::Creating
|
||||
} else {
|
||||
unreachable!(
|
||||
"unexpected number of successful reconciliations {}",
|
||||
successful.len()
|
||||
);
|
||||
};
|
||||
|
||||
// notify cplane about creation
|
||||
// TODO
|
||||
|
||||
self.persistence
|
||||
.update_timeline_status(tenant_id, timeline_id, status_kind, "{}".to_owned())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tenant_timeline_create_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: &TimelineInfo,
|
||||
create_mode: models::TimelineCreateRequestMode,
|
||||
) -> Result<(SafekeeperGeneration, Vec<NodeId>), ApiError> {
|
||||
let timeline_id = timeline_info.timeline_id;
|
||||
let pg_version = timeline_info.pg_version;
|
||||
let start_lsn = match create_mode {
|
||||
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
|
||||
)))?;
|
||||
}
|
||||
};
|
||||
|
||||
// Choose initial set of safekeepers respecting affinity
|
||||
let sks = self.safekeepers_for_new_timeline().await?;
|
||||
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
|
||||
let status_creating = TimelineStatusCreating {
|
||||
pg_version,
|
||||
start_lsn,
|
||||
};
|
||||
let status = serde_json::to_string(&status_creating).unwrap();
|
||||
// Add timeline to db
|
||||
let timeline_persist = TimelinePersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: 0,
|
||||
sk_set: sks_persistence.clone(),
|
||||
cplane_notified_generation: 0,
|
||||
status_kind: String::from(TimelineStatusKind::Creating),
|
||||
status,
|
||||
};
|
||||
self.persistence
|
||||
.insert_timeline(timeline_persist.clone())
|
||||
.await?;
|
||||
let sk_persistences = self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
self.tenant_timeline_create_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&timeline_persist,
|
||||
&status_creating,
|
||||
&sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
Ok((SafekeeperGeneration::new(0), sks))
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineCreateResponseStorcon, ApiError> {
|
||||
let safekeepers = create_req.safekeepers.unwrap_or_default();
|
||||
tracing::info!(
|
||||
%safekeepers,
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineCreate,
|
||||
)
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let create_mode = create_req.mode.clone();
|
||||
|
||||
let timeline_info = self
|
||||
.tenant_timeline_create_pageservers(tenant_id, create_req)
|
||||
.await?;
|
||||
|
||||
let (safekeepers_generation, safekeepers) = if safekeepers {
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
|
||||
.await?;
|
||||
(Some(res.0.into_inner()), Some(res.1))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
Ok(TimelineCreateResponseStorcon {
|
||||
timeline_info,
|
||||
safekeepers_generation,
|
||||
safekeepers,
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
@@ -3870,6 +4120,187 @@ impl Service {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn tenant_timeline_delete_safekeepers_reconcile(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
tl_p: &TimelinePersistence,
|
||||
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
|
||||
) -> Result<(), ApiError> {
|
||||
// If at least one deletion succeeded, return if we are outside of a specified timeout
|
||||
let jwt = self.config.jwt_token.clone().map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk in tl_p.sk_set.iter() {
|
||||
let Some(sk_p) = sk_persistences.get(sk) else {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find persisted entry for safekeeper with id {sk}"
|
||||
)))?;
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: NodeId(sk_p.id as u64),
|
||||
host: sk_p.host.clone(),
|
||||
pg_port: sk_p.port as u16,
|
||||
});
|
||||
}
|
||||
|
||||
let sks_to_reconcile = &tl_p.sk_set;
|
||||
for sk in sks_to_reconcile.iter() {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = sk_persistences.get(sk).unwrap();
|
||||
let sk_clone = NodeId(*sk as u64);
|
||||
let base_url = sk_p.base_url();
|
||||
let jwt = jwt.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
joinset.spawn(async move {
|
||||
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
|
||||
let retry_result = backoff::retry(
|
||||
|| client.delete_timeline(tenant_id, timeline_id),
|
||||
|_e| {
|
||||
// TODO find right criteria here for deciding on retries
|
||||
false
|
||||
},
|
||||
3,
|
||||
5,
|
||||
"delete timeline on safekeeper",
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
if let Some(res) = retry_result {
|
||||
res.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
anyhow::Error::new(e).context("error deleting timeline on safekeeper"),
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Err(ApiError::Cancelled)
|
||||
}
|
||||
});
|
||||
}
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
const SK_DELETE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_DELETE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
|
||||
// Treat the first task to finish differently, mostly when it times out,
|
||||
// because then we won't have any successful deletion.
|
||||
// For the second and third task, we don't rely on them succeeding, and we need this to support
|
||||
// continuing operations even if a safekeeper is down.
|
||||
let timeout_or_first = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
joinset.join_next().await.unwrap()
|
||||
})
|
||||
.await;
|
||||
let mut reconcile_results = Vec::new();
|
||||
match timeout_or_first {
|
||||
Ok(Ok(res_1)) => {
|
||||
reconcile_results.push(res_1);
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"task was cancelled while reconciling timeline deletion"
|
||||
)));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't reconcile timeline deletion on safekeepers within timeout"
|
||||
)));
|
||||
}
|
||||
}
|
||||
let timeout_or_last = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
while let Some(next_res) = joinset.join_next().await {
|
||||
match next_res {
|
||||
Ok(res) => {
|
||||
reconcile_results.push(res);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!("aborting reconciliation due to join error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if let Err(e) = timeout_or_last.await {
|
||||
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
|
||||
tracing::info!(
|
||||
"timeout for last {} reconciliations: {e}",
|
||||
sks_to_reconcile.len() - 1
|
||||
);
|
||||
}
|
||||
// check now if quorum was reached in reconcile_results
|
||||
let successful = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.ok())
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} successful results from reconciliation",
|
||||
successful.len()
|
||||
);
|
||||
let new_status_kind = if successful.is_empty() {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"not enough successful reconciliations to reach quorum, please retry: {}",
|
||||
successful.len()
|
||||
)));
|
||||
} else if successful.len() == sks_to_reconcile.len() {
|
||||
// Success, state of timeline is Deleted
|
||||
TimelineStatusKind::Deleted
|
||||
} else if successful.len() == 2 {
|
||||
// Success, state of timeline remains Creating
|
||||
TimelineStatusKind::Deleting
|
||||
} else {
|
||||
unreachable!(
|
||||
"unexpected number of successful reconciliations {}",
|
||||
successful.len()
|
||||
);
|
||||
};
|
||||
|
||||
if new_status_kind == TimelineStatusKind::Deleted {
|
||||
self.persistence
|
||||
.update_timeline_status_deleted(tenant_id, timeline_id)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn tenant_timeline_delete_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), ApiError> {
|
||||
let tl = self
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
let Some(tl) = tl else {
|
||||
tracing::info!("timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed");
|
||||
return Ok(());
|
||||
};
|
||||
let status_kind =
|
||||
TimelineStatusKind::from_str(&tl.status_kind).map_err(ApiError::InternalServerError)?;
|
||||
if status_kind != TimelineStatusKind::Deleting {
|
||||
// Set status to deleting
|
||||
let new_status_kind = TimelineStatusKind::Deleting;
|
||||
self.persistence
|
||||
.update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned())
|
||||
.await?;
|
||||
}
|
||||
let sk_persistences = self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
self.tenant_timeline_delete_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
&sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn tenant_timeline_delete(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -3883,7 +4314,7 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
let ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
@@ -3955,7 +4386,13 @@ impl Service {
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
}).await?
|
||||
});
|
||||
|
||||
let sk_fut = self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id);
|
||||
|
||||
let (ps_res, sk_res) = tokio::join!(ps_fut, sk_fut);
|
||||
sk_res?;
|
||||
ps_res?
|
||||
}
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
@@ -7653,6 +8090,32 @@ impl Service {
|
||||
global_observed
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeepers_for_new_timeline(&self) -> Result<Vec<NodeId>, ApiError> {
|
||||
let mut all_safekeepers = self
|
||||
.persistence
|
||||
.list_safekeepers_with_timeline_count()
|
||||
.await?;
|
||||
all_safekeepers.sort_by_key(|sk| sk.2);
|
||||
let mut sks = Vec::new();
|
||||
let mut azs = HashSet::new();
|
||||
for (sk_id, az_id, _timeline_count) in all_safekeepers.iter() {
|
||||
if !azs.insert(az_id) {
|
||||
continue;
|
||||
}
|
||||
sks.push(*sk_id);
|
||||
if sks.len() == 3 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if sks.len() == 3 {
|
||||
Ok(sks)
|
||||
} else {
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find three safekeepers in different AZs for new timeline"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeepers_list(
|
||||
&self,
|
||||
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
|
||||
|
||||
130
storage_controller/src/service/safekeeper_reconciler.rs
Normal file
130
storage_controller/src/service/safekeeper_reconciler.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::{
|
||||
failpoint_support,
|
||||
id::{TenantId, TimelineId},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
id_lock_map::trace_shared_lock,
|
||||
persistence::SafekeeperPersistence,
|
||||
service::{TenantOperations, TimelineStatusCreating, TimelineStatusKind},
|
||||
};
|
||||
|
||||
use super::{Service, TimelinePersistence};
|
||||
|
||||
pub struct SafekeeperReconciler {
|
||||
service: Arc<Service>,
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
impl SafekeeperReconciler {
|
||||
pub fn new(service: Arc<Service>, duration: Duration) -> Self {
|
||||
SafekeeperReconciler { service, duration }
|
||||
}
|
||||
pub async fn run(&self, cancel: CancellationToken) {
|
||||
while !cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(self.duration) => (),
|
||||
_ = cancel.cancelled() => break,
|
||||
}
|
||||
match self.reconcile_iteration(&cancel).await {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
tracing::warn!("Error during safekeeper reconciliation: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn reconcile_iteration(&self, cancel: &CancellationToken) -> Result<(), anyhow::Error> {
|
||||
let work_list = self
|
||||
.service
|
||||
.persistence
|
||||
.timelines_to_be_reconciled()
|
||||
.await?;
|
||||
if work_list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let sk_persistences = self
|
||||
.service
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
for tl in work_list {
|
||||
let reconcile_fut =
|
||||
self.reconcile_timeline(&tl, &sk_persistences)
|
||||
.instrument(tracing::info_span!(
|
||||
"safekeeper_reconcile_timeline",
|
||||
timeline_id = tl.timeline_id,
|
||||
tenant_id = tl.tenant_id
|
||||
));
|
||||
|
||||
tokio::select! {
|
||||
r = reconcile_fut => r?,
|
||||
_ = cancel.cancelled() => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn reconcile_timeline(
|
||||
&self,
|
||||
tl: &TimelinePersistence,
|
||||
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
tracing::info!("Reconciling timeline on safekeepers");
|
||||
let tenant_id = TenantId::from_slice(tl.tenant_id.as_bytes())?;
|
||||
let timeline_id = TimelineId::from_slice(tl.timeline_id.as_bytes())?;
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.service.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineReconcile,
|
||||
)
|
||||
.await;
|
||||
|
||||
failpoint_support::sleep_millis_async!("safekeeper-reconcile-timeline-shared-lock");
|
||||
// Load the timeline again from the db: unless we hold the tenant lock, the timeline can change under our noses.
|
||||
let tl = self
|
||||
.service
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
let Some(tl) = tl else {
|
||||
// This can happen but is a bit unlikely, so print it on the warn level instead of info
|
||||
tracing::warn!("timeline row in database disappeared");
|
||||
return Ok(());
|
||||
};
|
||||
let status = TimelineStatusKind::from_str(&tl.status)?;
|
||||
match status {
|
||||
TimelineStatusKind::Created | TimelineStatusKind::Deleted => (),
|
||||
TimelineStatusKind::Creating => {
|
||||
let status_creating: TimelineStatusCreating = serde_json::from_str(&tl.status)?;
|
||||
self.service
|
||||
.tenant_timeline_create_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
&status_creating,
|
||||
sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
TimelineStatusKind::Deleting => {
|
||||
self.service
|
||||
.tenant_timeline_delete_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user