From 6138d61592f0bfbb1d8f3f033d3fec2983ae7936 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Tue, 8 Apr 2025 15:54:53 +0100 Subject: [PATCH] Object storage proxy (#11357) Service targeted for storing and retrieving LFC prewarm data. Can be used for proxying S3 access for Postgres extensions like pg_mooncake as well. Requests must include a Bearer JWT token. Token is validated using a pemfile (should be passed in infra/). Note: app is not tolerant to extra trailing slashes, see app.rs `delete_prefix` test for comments. Resolves: https://github.com/neondatabase/cloud/issues/26342 Unrelated changes: gate a `rename_noreplace` feature and disable it in `remote_storage` so as `object_storage` can be built with musl --- .dockerignore | 1 + Cargo.lock | 55 +- Cargo.toml | 2 + Dockerfile | 2 + control_plane/src/bin/neon_local.rs | 83 ++- control_plane/src/lib.rs | 1 + control_plane/src/local_env.rs | 36 ++ control_plane/src/object_storage.rs | 107 ++++ libs/remote_storage/Cargo.toml | 2 +- libs/utils/Cargo.toml | 5 +- libs/utils/src/fs_ext.rs | 2 + libs/utils/src/fs_ext/rename_noreplace.rs | 4 +- libs/utils/src/signals.rs | 29 + object_storage/Cargo.toml | 28 + object_storage/src/app.rs | 561 ++++++++++++++++++ object_storage/src/lib.rs | 344 +++++++++++ object_storage/src/main.rs | 65 ++ pageserver/src/bin/pageserver.rs | 28 +- test_runner/fixtures/neon_cli.py | 13 + test_runner/fixtures/neon_fixtures.py | 29 + test_runner/regress/test_neon_cli.py | 7 +- test_runner/regress/test_object_storage.py | 56 ++ .../regress/test_storage_controller.py | 2 + 23 files changed, 1424 insertions(+), 38 deletions(-) create mode 100644 control_plane/src/object_storage.rs create mode 100644 object_storage/Cargo.toml create mode 100644 object_storage/src/app.rs create mode 100644 object_storage/src/lib.rs create mode 100644 object_storage/src/main.rs create mode 100644 test_runner/regress/test_object_storage.py diff --git a/.dockerignore b/.dockerignore index 9fafc2e4ba..ffa72eaf51 100644 --- a/.dockerignore +++ b/.dockerignore @@ -19,6 +19,7 @@ !pageserver/ !pgxn/ !proxy/ +!object_storage/ !storage_scrubber/ !safekeeper/ !storage_broker/ diff --git a/Cargo.lock b/Cargo.lock index dbbf2c3357..aea8924f4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3991,6 +3991,33 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_storage" +version = "0.0.1" +dependencies = [ + "anyhow", + "axum", + "axum-extra", + "camino", + "camino-tempfile", + "futures", + "http-body-util", + "itertools 0.10.5", + "jsonwebtoken", + "prometheus", + "rand 0.8.5", + "remote_storage", + "serde", + "serde_json", + "test-log", + "tokio", + "tokio-util", + "tower 0.5.2", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "once_cell" version = "1.20.2" @@ -4693,7 +4720,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.6" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b" dependencies = [ "base64 0.22.1", "byteorder", @@ -4727,7 +4754,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.6" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b" dependencies = [ "bytes", "chrono", @@ -6925,6 +6952,28 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "test-log" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f46083d221181166e5b6f6b1e5f1d499f3a76888826e6cb1d057554157cd0f" +dependencies = [ + "env_logger", + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "888d0c3c6db53c0fdab160d2ed5e12ba745383d3e85813f2ea0f2b1475ab553f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -7172,7 +7221,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.10" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 1f605681db..d957fa9070 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "libs/proxy/postgres-protocol2", "libs/proxy/postgres-types2", "libs/proxy/tokio-postgres2", + "object_storage", ] [workspace.package] @@ -208,6 +209,7 @@ tracing-opentelemetry = "0.28" tracing-serde = "0.2.0" tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] } try-lock = "0.2.5" +test-log = { version = "0.2.17", default-features = false, features = ["log"] } twox-hash = { version = "1.6.3", default-features = false } typed-json = "0.1" url = "2.2" diff --git a/Dockerfile b/Dockerfile index 01540e1925..848bfab921 100644 --- a/Dockerfile +++ b/Dockerfile @@ -89,6 +89,7 @@ RUN set -e \ --bin storage_broker \ --bin storage_controller \ --bin proxy \ + --bin object_storage \ --bin neon_local \ --bin storage_scrubber \ --locked --release @@ -121,6 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin +COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 747268f80b..99f0d374c1 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -20,8 +20,10 @@ use compute_api::spec::ComputeMode; use control_plane::endpoint::ComputeControlPlane; use control_plane::local_env::{ InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf, - SafekeeperConf, + ObjectStorageConf, SafekeeperConf, }; +use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT; +use control_plane::object_storage::ObjectStorage; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; use control_plane::storage_controller::{ @@ -91,6 +93,8 @@ enum NeonLocalCmd { #[command(subcommand)] Safekeeper(SafekeeperCmd), #[command(subcommand)] + ObjectStorage(ObjectStorageCmd), + #[command(subcommand)] Endpoint(EndpointCmd), #[command(subcommand)] Mappings(MappingsCmd), @@ -454,6 +458,32 @@ enum SafekeeperCmd { Restart(SafekeeperRestartCmdArgs), } +#[derive(clap::Subcommand)] +#[clap(about = "Manage object storage")] +enum ObjectStorageCmd { + Start(ObjectStorageStartCmd), + Stop(ObjectStorageStopCmd), +} + +#[derive(clap::Args)] +#[clap(about = "Start object storage")] +struct ObjectStorageStartCmd { + #[clap(short = 't', long, help = "timeout until we fail the command")] + #[arg(default_value = "10s")] + start_timeout: humantime::Duration, +} + +#[derive(clap::Args)] +#[clap(about = "Stop object storage")] +struct ObjectStorageStopCmd { + #[arg(value_enum, default_value = "fast")] + #[clap( + short = 'm', + help = "If 'immediate', don't flush repository data at shutdown" + )] + stop_mode: StopMode, +} + #[derive(clap::Args)] #[clap(about = "Start local safekeeper")] struct SafekeeperStartCmdArgs { @@ -759,6 +789,7 @@ fn main() -> Result<()> { } NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)), NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)), + NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)), NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)), NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env), }; @@ -975,6 +1006,9 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { } }) .collect(), + object_storage: ObjectStorageConf { + port: OBJECT_STORAGE_DEFAULT_PORT, + }, pg_distrib_dir: None, neon_distrib_dir: None, default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)), @@ -1683,6 +1717,41 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Ok(()) } +async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> { + use ObjectStorageCmd::*; + let storage = ObjectStorage::from_env(env); + + // In tests like test_forward_compatibility or test_graceful_cluster_restart + // old neon binaries (without object_storage) are present + if !storage.bin.exists() { + eprintln!( + "{} binary not found. Ignore if this is a compatibility test", + storage.bin + ); + return Ok(()); + } + + match subcmd { + Start(ObjectStorageStartCmd { start_timeout }) => { + if let Err(e) = storage.start(start_timeout).await { + eprintln!("object_storage start failed: {e}"); + exit(1); + } + } + Stop(ObjectStorageStopCmd { stop_mode }) => { + let immediate = match stop_mode { + StopMode::Fast => false, + StopMode::Immediate => true, + }; + if let Err(e) = storage.stop(immediate) { + eprintln!("proxy stop failed: {e}"); + exit(1); + } + } + }; + Ok(()) +} + async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> { match subcmd { StorageBrokerCmd::Start(args) => { @@ -1777,6 +1846,13 @@ async fn handle_start_all_impl( .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id))) }); } + + js.spawn(async move { + ObjectStorage::from_env(env) + .start(&retry_timeout) + .await + .map_err(|e| e.context("start object_storage")) + }); })(); let mut errors = Vec::new(); @@ -1874,6 +1950,11 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } } + let storage = ObjectStorage::from_env(env); + if let Err(e) = storage.stop(immediate) { + eprintln!("object_storage stop failed: {:#}", e); + } + for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); if let Err(e) = pageserver.stop(immediate) { diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 2af272f388..2d9fe2c807 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -10,6 +10,7 @@ mod background_process; pub mod broker; pub mod endpoint; pub mod local_env; +pub mod object_storage; pub mod pageserver; pub mod postgresql_conf; pub mod safekeeper; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 3f3794c0ee..2616afbb16 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; use utils::auth::{Claims, encode_from_key_file}; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; +use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage}; use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode}; use crate::safekeeper::SafekeeperNode; @@ -55,6 +56,7 @@ pub struct LocalEnv { // used to issue tokens during e.g pg start pub private_key_path: PathBuf, + pub public_key_path: PathBuf, pub broker: NeonBroker, @@ -68,6 +70,8 @@ pub struct LocalEnv { pub safekeepers: Vec, + pub object_storage: ObjectStorageConf, + // Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will // be propagated into each pageserver's configuration. pub control_plane_api: Url, @@ -95,6 +99,7 @@ pub struct OnDiskConfig { pub neon_distrib_dir: PathBuf, pub default_tenant_id: Option, pub private_key_path: PathBuf, + pub public_key_path: PathBuf, pub broker: NeonBroker, pub storage_controller: NeonStorageControllerConf, #[serde( @@ -103,6 +108,7 @@ pub struct OnDiskConfig { )] pub pageservers: Vec, pub safekeepers: Vec, + pub object_storage: ObjectStorageConf, pub control_plane_api: Option, pub control_plane_hooks_api: Option, pub control_plane_compute_hook_api: Option, @@ -136,11 +142,18 @@ pub struct NeonLocalInitConf { pub storage_controller: Option, pub pageservers: Vec, pub safekeepers: Vec, + pub object_storage: ObjectStorageConf, pub control_plane_api: Option, pub control_plane_hooks_api: Option, pub generate_local_ssl_certs: bool, } +#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)] +#[serde(default)] +pub struct ObjectStorageConf { + pub port: u16, +} + /// Broker config for cluster internal communication. #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] #[serde(default)] @@ -398,6 +411,10 @@ impl LocalEnv { self.pg_dir(pg_version, "lib") } + pub fn object_storage_bin(&self) -> PathBuf { + self.neon_distrib_dir.join("object_storage") + } + pub fn pageserver_bin(&self) -> PathBuf { self.neon_distrib_dir.join("pageserver") } @@ -431,6 +448,10 @@ impl LocalEnv { self.base_data_dir.join("safekeepers").join(data_dir_name) } + pub fn object_storage_data_dir(&self) -> PathBuf { + self.base_data_dir.join("object_storage") + } + pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> { if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) { Ok(conf) @@ -582,6 +603,7 @@ impl LocalEnv { neon_distrib_dir, default_tenant_id, private_key_path, + public_key_path, broker, storage_controller, pageservers, @@ -591,6 +613,7 @@ impl LocalEnv { control_plane_compute_hook_api: _, branch_name_mappings, generate_local_ssl_certs, + object_storage, } = on_disk_config; LocalEnv { base_data_dir: repopath.to_owned(), @@ -598,6 +621,7 @@ impl LocalEnv { neon_distrib_dir, default_tenant_id, private_key_path, + public_key_path, broker, storage_controller, pageservers, @@ -606,6 +630,7 @@ impl LocalEnv { control_plane_hooks_api, branch_name_mappings, generate_local_ssl_certs, + object_storage, } }; @@ -705,6 +730,7 @@ impl LocalEnv { neon_distrib_dir: self.neon_distrib_dir.clone(), default_tenant_id: self.default_tenant_id, private_key_path: self.private_key_path.clone(), + public_key_path: self.public_key_path.clone(), broker: self.broker.clone(), storage_controller: self.storage_controller.clone(), pageservers: vec![], // it's skip_serializing anyway @@ -714,6 +740,7 @@ impl LocalEnv { control_plane_compute_hook_api: None, branch_name_mappings: self.branch_name_mappings.clone(), generate_local_ssl_certs: self.generate_local_ssl_certs, + object_storage: self.object_storage.clone(), }, ) } @@ -797,6 +824,7 @@ impl LocalEnv { control_plane_api, generate_local_ssl_certs, control_plane_hooks_api, + object_storage, } = conf; // Find postgres binaries. @@ -828,6 +856,7 @@ impl LocalEnv { ) .context("generate auth keys")?; let private_key_path = PathBuf::from("auth_private_key.pem"); + let public_key_path = PathBuf::from("auth_public_key.pem"); // create the runtime type because the remaining initialization code below needs // a LocalEnv instance op operation @@ -838,6 +867,7 @@ impl LocalEnv { neon_distrib_dir, default_tenant_id: Some(default_tenant_id), private_key_path, + public_key_path, broker, storage_controller: storage_controller.unwrap_or_default(), pageservers: pageservers.iter().map(Into::into).collect(), @@ -846,6 +876,7 @@ impl LocalEnv { control_plane_hooks_api, branch_name_mappings: Default::default(), generate_local_ssl_certs, + object_storage, }; if generate_local_ssl_certs { @@ -873,8 +904,13 @@ impl LocalEnv { .context("pageserver init failed")?; } + ObjectStorage::from_env(&env) + .init() + .context("object storage init failed")?; + // setup remote remote location for default LocalFs remote storage std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?; + std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?; env.persist_config() } diff --git a/control_plane/src/object_storage.rs b/control_plane/src/object_storage.rs new file mode 100644 index 0000000000..1a595b7809 --- /dev/null +++ b/control_plane/src/object_storage.rs @@ -0,0 +1,107 @@ +use crate::background_process::{self, start_process, stop_process}; +use crate::local_env::LocalEnv; +use anyhow::anyhow; +use anyhow::{Context, Result}; +use camino::Utf8PathBuf; +use std::io::Write; +use std::time::Duration; + +/// Directory within .neon which will be used by default for LocalFs remote storage. +pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage"; +pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993; + +pub struct ObjectStorage { + pub bin: Utf8PathBuf, + pub data_dir: Utf8PathBuf, + pub pemfile: Utf8PathBuf, + pub port: u16, +} + +impl ObjectStorage { + pub fn from_env(env: &LocalEnv) -> ObjectStorage { + ObjectStorage { + bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(), + data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(), + pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(), + port: env.object_storage.port, + } + } + + fn config_path(&self) -> Utf8PathBuf { + self.data_dir.join("object_storage.json") + } + + fn listen_addr(&self) -> Utf8PathBuf { + format!("127.0.0.1:{}", self.port).into() + } + + pub fn init(&self) -> Result<()> { + println!("Initializing object storage in {:?}", self.data_dir); + let parent = self.data_dir.parent().unwrap(); + + #[derive(serde::Serialize)] + struct Cfg { + listen: Utf8PathBuf, + pemfile: Utf8PathBuf, + local_path: Utf8PathBuf, + r#type: String, + } + let cfg = Cfg { + listen: self.listen_addr(), + pemfile: parent.join(self.pemfile.clone()), + local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR), + r#type: "LocalFs".to_string(), + }; + std::fs::create_dir_all(self.config_path().parent().unwrap())?; + std::fs::write(self.config_path(), serde_json::to_string(&cfg)?) + .context("write object storage config")?; + Ok(()) + } + + pub async fn start(&self, retry_timeout: &Duration) -> Result<()> { + println!("Starting s3 proxy at {}", self.listen_addr()); + std::io::stdout().flush().context("flush stdout")?; + + let process_status_check = || async { + tokio::time::sleep(Duration::from_millis(500)).await; + let res = reqwest::Client::new() + .get(format!("http://{}/metrics", self.listen_addr())) + .send() + .await; + match res { + Ok(response) if response.status().is_success() => Ok(true), + Ok(_) => Err(anyhow!("Failed to query /metrics")), + Err(e) => Err(anyhow!("Failed to check node status: {e}")), + } + }; + + let res = start_process( + "object_storage", + &self.data_dir.clone().into_std_path_buf(), + &self.bin.clone().into_std_path_buf(), + vec![self.config_path().to_string()], + vec![("RUST_LOG".into(), "debug".into())], + background_process::InitialPidFile::Create(self.pid_file()), + retry_timeout, + process_status_check, + ) + .await; + if res.is_err() { + eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?); + } + + res + } + + pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { + stop_process(immediate, "object_storage", &self.pid_file()) + } + + fn log_file(&self) -> Utf8PathBuf { + self.data_dir.join("object_storage.log") + } + + fn pid_file(&self) -> Utf8PathBuf { + self.data_dir.join("object_storage.pid") + } +} diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 7bdf340f74..bd18d80915 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -28,7 +28,7 @@ toml_edit.workspace = true tracing.workspace = true scopeguard.workspace = true metrics.workspace = true -utils.workspace = true +utils = { path = "../utils", default-features = false } pin-project-lite.workspace = true azure_core.workspace = true diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 4180602ac7..fd2fa63fd0 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -5,7 +5,8 @@ edition.workspace = true license.workspace = true [features] -default = [] +default = ["rename_noreplace"] +rename_noreplace = [] # Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, # which adds some runtime cost to run tests on outage conditions testing = ["fail/failpoints"] @@ -35,7 +36,7 @@ serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true thiserror.workspace = true -tokio.workspace = true +tokio = { workspace = true, features = ["signal"] } tokio-tar.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = ["serde"] } diff --git a/libs/utils/src/fs_ext.rs b/libs/utils/src/fs_ext.rs index a406ab0378..e16edaaa9a 100644 --- a/libs/utils/src/fs_ext.rs +++ b/libs/utils/src/fs_ext.rs @@ -3,7 +3,9 @@ use std::{fs, io, path::Path}; use anyhow::Context; +#[cfg(feature = "rename_noreplace")] mod rename_noreplace; +#[cfg(feature = "rename_noreplace")] pub use rename_noreplace::rename_noreplace; pub trait PathExt { diff --git a/libs/utils/src/fs_ext/rename_noreplace.rs b/libs/utils/src/fs_ext/rename_noreplace.rs index fc6f794b57..d0c07353d0 100644 --- a/libs/utils/src/fs_ext/rename_noreplace.rs +++ b/libs/utils/src/fs_ext/rename_noreplace.rs @@ -8,7 +8,7 @@ pub fn rename_noreplace( dst: &P2, ) -> nix::Result<()> { { - #[cfg(target_os = "linux")] + #[cfg(all(target_os = "linux", target_env = "gnu"))] { nix::fcntl::renameat2( None, @@ -29,7 +29,7 @@ pub fn rename_noreplace( })??; nix::errno::Errno::result(res).map(drop) } - #[cfg(not(any(target_os = "linux", target_os = "macos")))] + #[cfg(not(any(all(target_os = "linux", target_env = "gnu"), target_os = "macos")))] { std::compile_error!("OS does not support no-replace renames"); } diff --git a/libs/utils/src/signals.rs b/libs/utils/src/signals.rs index f2be1957c4..426bb65916 100644 --- a/libs/utils/src/signals.rs +++ b/libs/utils/src/signals.rs @@ -1,6 +1,8 @@ pub use signal_hook::consts::TERM_SIGNALS; pub use signal_hook::consts::signal::*; use signal_hook::iterator::Signals; +use tokio::signal::unix::{SignalKind, signal}; +use tracing::info; pub enum Signal { Quit, @@ -36,3 +38,30 @@ impl ShutdownSignals { Ok(()) } } + +/// Runs in a loop since we want to be responsive to multiple signals +/// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown) +/// +pub async fn signal_handler(token: tokio_util::sync::CancellationToken) { + let mut sigint = signal(SignalKind::interrupt()).unwrap(); + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + let mut sigquit = signal(SignalKind::quit()).unwrap(); + + loop { + let signal = tokio::select! { + _ = sigquit.recv() => { + info!("Got signal SIGQUIT. Terminating in immediate shutdown mode."); + std::process::exit(111); + } + _ = sigint.recv() => "SIGINT", + _ = sigterm.recv() => "SIGTERM", + }; + + if !token.is_cancelled() { + info!("Got signal {signal}. Terminating gracefully in fast shutdown mode."); + token.cancel(); + } else { + info!("Got signal {signal}. Already shutting down."); + } + } +} diff --git a/object_storage/Cargo.toml b/object_storage/Cargo.toml new file mode 100644 index 0000000000..17fbaefe6f --- /dev/null +++ b/object_storage/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "object_storage" +version = "0.0.1" +edition.workspace = true +license.workspace = true +[dependencies] +anyhow.workspace = true +axum-extra.workspace = true +axum.workspace = true +camino.workspace = true +futures.workspace = true +jsonwebtoken.workspace = true +prometheus.workspace = true +remote_storage.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio-util.workspace = true +tokio.workspace = true +tracing.workspace = true +utils = { path = "../libs/utils", default-features = false } +workspace_hack.workspace = true +[dev-dependencies] +camino-tempfile.workspace = true +http-body-util.workspace = true +itertools.workspace = true +rand.workspace = true +test-log.workspace = true +tower.workspace = true diff --git a/object_storage/src/app.rs b/object_storage/src/app.rs new file mode 100644 index 0000000000..7b5627f0db --- /dev/null +++ b/object_storage/src/app.rs @@ -0,0 +1,561 @@ +use anyhow::anyhow; +use axum::body::{Body, Bytes}; +use axum::response::{IntoResponse, Response}; +use axum::{Router, http::StatusCode}; +use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok}; +use remote_storage::TimeoutOrCancel; +use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath}; +use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; +use utils::backoff::retry; + +pub fn app(state: Arc) -> Router<()> { + use axum::routing::{delete as _delete, get as _get}; + let delete_prefix = _delete(delete_prefix); + Router::new() + .route( + "/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}", + _get(get).put(set).delete(delete), + ) + .route( + "/{tenant_id}/{timeline_id}/{endpoint_id}", + delete_prefix.clone(), + ) + .route("/{tenant_id}/{timeline_id}", delete_prefix.clone()) + .route("/{tenant_id}", delete_prefix) + .route("/metrics", _get(metrics)) + .route("/status", _get(async || StatusCode::OK.into_response())) + .with_state(state) +} + +type Result = anyhow::Result; +type State = axum::extract::State>; + +const CONTENT_TYPE: &str = "content-type"; +const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; +const WARN_THRESHOLD: u32 = 3; +const MAX_RETRIES: u32 = 10; + +async fn metrics() -> Result { + prometheus::TextEncoder::new() + .encode_to_string(&prometheus::gather()) + .map(|s| s.into_response()) + .map_err(|e| internal_error(e, "/metrics", "collecting metrics")) +} + +async fn get(S3Path { path }: S3Path, state: State) -> Result { + info!(%path, "downloading"); + let download_err = |e| { + if let DownloadError::NotFound = e { + info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service + return not_found(&path); + } + internal_error(e, &path, "downloading") + }; + let cancel = state.cancel.clone(); + let opts = &DownloadOpts::default(); + + let stream = retry( + async || state.storage.download(&path, opts, &cancel).await, + DownloadError::is_permanent, + WARN_THRESHOLD, + MAX_RETRIES, + "downloading", + &cancel, + ) + .await + .unwrap_or(Err(DownloadError::Cancelled)) + .map_err(download_err)? + .download_stream; + + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM) + .body(Body::from_stream(stream)) + .map_err(|e| internal_error(e, path, "reading response")) +} + +// Best solution for files is multipart upload, but remote_storage doesn't support it, +// so we can either read Bytes in memory and push at once or forward BodyDataStream to +// remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a +// guaranteed size() which may produce issues while uploading to s3. +// So, currently we're going with an in-memory copy plus a boundary to prevent uploading +// very large files. +async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result { + info!(%path, "uploading"); + let request_len = bytes.len(); + let max_len = state.max_upload_file_limit; + if request_len > max_len { + return Err(bad_request( + anyhow!("File size {request_len} exceeds max {max_len}"), + "uploading", + )); + } + + let cancel = state.cancel.clone(); + let fun = async || { + let stream = bytes_to_stream(bytes.clone()); + state + .storage + .upload(stream, request_len, &path, None, &cancel) + .await + }; + retry( + fun, + TimeoutOrCancel::caused_by_cancel, + WARN_THRESHOLD, + MAX_RETRIES, + "uploading", + &cancel, + ) + .await + .unwrap_or(Err(anyhow!("uploading cancelled"))) + .map_err(|e| internal_error(e, path, "reading response"))?; + Ok(ok()) +} + +async fn delete(S3Path { path }: S3Path, state: State) -> Result { + info!(%path, "deleting"); + let cancel = state.cancel.clone(); + retry( + async || state.storage.delete(&path, &cancel).await, + TimeoutOrCancel::caused_by_cancel, + WARN_THRESHOLD, + MAX_RETRIES, + "deleting", + &cancel, + ) + .await + .unwrap_or(Err(anyhow!("deleting cancelled"))) + .map_err(|e| internal_error(e, path, "deleting"))?; + Ok(ok()) +} + +async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result { + info!(%path, "deleting prefix"); + let cancel = state.cancel.clone(); + retry( + async || state.storage.delete_prefix(&path, &cancel).await, + TimeoutOrCancel::caused_by_cancel, + WARN_THRESHOLD, + MAX_RETRIES, + "deleting prefix", + &cancel, + ) + .await + .unwrap_or(Err(anyhow!("deleting prefix cancelled"))) + .map_err(|e| internal_error(e, path, "deleting prefix"))?; + Ok(ok()) +} + +pub async fn check_storage_permissions( + client: &GenericRemoteStorage, + cancel: CancellationToken, +) -> anyhow::Result<()> { + info!("storage permissions check"); + + // as_nanos() as multiple instances proxying same bucket may be started at once + let now = SystemTime::now() + .duration_since(UNIX_EPOCH)? + .as_nanos() + .to_string(); + + let path = RemotePath::from_string(&format!("write_access_{now}"))?; + info!(%path, "uploading"); + + let body = now.to_string(); + let stream = bytes_to_stream(Bytes::from(body.clone())); + client + .upload(stream, body.len(), &path, None, &cancel) + .await?; + + use tokio::io::AsyncReadExt; + info!(%path, "downloading"); + let download_opts = DownloadOpts { + kind: remote_storage::DownloadKind::Small, + ..Default::default() + }; + let mut body_read_buf = Vec::new(); + let stream = client + .download(&path, &download_opts, &cancel) + .await? + .download_stream; + tokio_util::io::StreamReader::new(stream) + .read_to_end(&mut body_read_buf) + .await?; + let body_read = String::from_utf8(body_read_buf)?; + if body != body_read { + error!(%body, %body_read, "File contents do not match"); + anyhow::bail!("Read back file doesn't match original") + } + + info!(%path, "removing"); + client.delete(&path, &cancel).await +} + +fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream> { + futures::stream::once(futures::future::ready(Ok(bytes))) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{body::Body, extract::Request, response::Response}; + use http_body_util::BodyExt; + use itertools::iproduct; + use std::env::var; + use std::sync::Arc; + use std::time::Duration; + use test_log::test as testlog; + use tower::{Service, util::ServiceExt}; + use utils::id::{TenantId, TimelineId}; + + // see libs/remote_storage/tests/test_real_s3.rs + const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE"; + const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET"; + const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION"; + + async fn proxy() -> (Storage, Option) { + let cancel = CancellationToken::new(); + let (dir, storage) = if var(REAL_S3_ENV).is_err() { + // tests execute in parallel and we need a new directory for each of them + let dir = camino_tempfile::tempdir().unwrap(); + let fs = + remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap(); + (Some(dir), GenericRemoteStorage::LocalFs(fs)) + } else { + // test_real_s3::create_s3_client is hard to reference, reimplementing here + let millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + use rand::Rng; + let random = rand::thread_rng().r#gen::(); + + let s3_config = remote_storage::S3Config { + bucket_name: var(REAL_S3_BUCKET).unwrap(), + bucket_region: var(REAL_S3_REGION).unwrap(), + prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")), + endpoint: None, + concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(), + max_keys_per_list_response: None, + upload_storage_class: None, + }; + let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1)) + .await + .unwrap(); + (None, GenericRemoteStorage::AwsS3(Arc::new(bucket))) + }; + + let proxy = Storage { + auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(), + storage, + cancel: cancel.clone(), + max_upload_file_limit: usize::MAX, + }; + check_storage_permissions(&proxy.storage, cancel) + .await + .unwrap(); + (proxy, dir) + } + + // see libs/utils/src/auth.rs + const TEST_PUB_KEY_ED25519: &[u8] = b" +-----BEGIN PUBLIC KEY----- +MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w= +-----END PUBLIC KEY----- +"; + + const TEST_PRIV_KEY_ED25519: &[u8] = br#" +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH +-----END PRIVATE KEY----- +"#; + + async fn request(req: Request) -> Response { + let (proxy, _) = proxy().await; + app(Arc::new(proxy)) + .into_service() + .oneshot(req) + .await + .unwrap() + } + + #[testlog(tokio::test)] + async fn status() { + let res = Request::builder() + .uri("/status") + .body(Body::empty()) + .map(request) + .unwrap() + .await; + assert_eq!(res.status(), StatusCode::OK); + } + + fn routes() -> impl Iterator { + iproduct!( + vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"], + vec!["GET", "PUT", "DELETE"] + ) + } + + #[testlog(tokio::test)] + async fn no_token() { + for (uri, method) in routes() { + info!(%uri, %method); + let res = Request::builder() + .uri(uri) + .method(method) + .body(Body::empty()) + .map(request) + .unwrap() + .await; + assert!(matches!( + res.status(), + StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST + )); + } + } + + #[testlog(tokio::test)] + async fn invalid_token() { + for (uri, method) in routes() { + info!(%uri, %method); + let status = Request::builder() + .uri(uri) + .header("Authorization", "Bearer 123") + .method(method) + .body(Body::empty()) + .map(request) + .unwrap() + .await; + assert!(matches!( + status.status(), + StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST + )); + } + } + + const TENANT_ID: TenantId = + TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]); + const TIMELINE_ID: TimelineId = + TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]); + const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg"; + fn token() -> String { + let claims = object_storage::Claims { + tenant_id: TENANT_ID, + timeline_id: TIMELINE_ID, + endpoint_id: ENDPOINT_ID.into(), + exp: u64::MAX, + }; + let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap(); + let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO); + jsonwebtoken::encode(&header, &claims, &key).unwrap() + } + + #[testlog(tokio::test)] + async fn unauthorized() { + let (proxy, _) = proxy().await; + let mut app = app(Arc::new(proxy)).into_service(); + let token = token(); + let args = itertools::iproduct!( + vec![TENANT_ID.to_string(), TenantId::generate().to_string()], + vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()], + vec![ENDPOINT_ID, "ep-ololo"] + ) + .skip(1); + + for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) { + info!(%uri, %method, %tenant, %timeline, %endpoint); + let request = Request::builder() + .uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key")) + .method(method) + .header("Authorization", format!("Bearer {}", token)) + .body(Body::empty()) + .unwrap(); + let status = ServiceExt::ready(&mut app) + .await + .unwrap() + .call(request) + .await + .unwrap() + .status(); + assert_eq!(status, StatusCode::UNAUTHORIZED); + } + } + + #[testlog(tokio::test)] + async fn method_not_allowed() { + let token = token(); + let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]); + for (key, method) in iter { + let status = Request::builder() + .uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}")) + .method(method) + .header("Authorization", format!("Bearer {token}")) + .body(Body::empty()) + .map(request) + .unwrap() + .await + .status(); + assert!(matches!( + status, + StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED + )); + } + } + + async fn requests_chain( + chain: impl Iterator, + token: impl Fn(&str) -> String, + ) { + let (proxy, _) = proxy().await; + let mut app = app(Arc::new(proxy)).into_service(); + for (uri, method, body, expected_status, compare_body) in chain { + info!(%uri, %method, %body, %expected_status); + let bearer = format!("Bearer {}", token(&uri)); + let request = Request::builder() + .uri(uri) + .method(method) + .header("Authorization", &bearer) + .body(Body::from(body)) + .unwrap(); + let response = ServiceExt::ready(&mut app) + .await + .unwrap() + .call(request) + .await + .unwrap(); + assert_eq!(response.status(), expected_status); + if !compare_body { + continue; + } + let read_body = response.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(body, read_body); + } + } + + #[testlog(tokio::test)] + async fn metrics() { + let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key"); + let req = vec![ + (uri.clone(), "PUT", "body", StatusCode::OK, false), + (uri.clone(), "DELETE", "", StatusCode::OK, false), + ]; + requests_chain(req.into_iter(), |_| token()).await; + + let res = Request::builder() + .uri("/metrics") + .body(Body::empty()) + .map(request) + .unwrap() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = res.into_body().collect().await.unwrap().to_bytes(); + let body = String::from_utf8_lossy(&body); + tracing::debug!(%body); + // Storage metrics are not gathered for LocalFs + if var(REAL_S3_ENV).is_ok() { + assert!(body.contains("remote_storage_s3_deleted_objects_total")); + } + assert!(body.contains("process_threads")); + } + + #[testlog(tokio::test)] + async fn insert_retrieve_remove() { + let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key"); + let chain = vec![ + (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false), + (uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false), + (uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true), + (uri.clone(), "DELETE", "", StatusCode::OK, false), + (uri, "GET", "", StatusCode::NOT_FOUND, false), + ]; + requests_chain(chain.into_iter(), |_| token()).await; + } + + fn delete_prefix_token(uri: &str) -> String { + use serde::Serialize; + let parts = uri.split("/").collect::>(); + #[derive(Serialize)] + struct PrefixClaims { + tenant_id: TenantId, + timeline_id: Option, + endpoint_id: Option, + exp: u64, + } + let claims = PrefixClaims { + tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(), + timeline_id: parts.get(2).map(|c| c.parse().unwrap()), + endpoint_id: parts.get(3).map(ToString::to_string), + exp: u64::MAX, + }; + let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap(); + let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO); + jsonwebtoken::encode(&header, &claims, &key).unwrap() + } + + // Can't use single digit numbers as they won't be validated as TimelineId and EndpointId + #[testlog(tokio::test)] + async fn delete_prefix() { + let tenant_id = + TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string(); + let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); + let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); + let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); + let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}"); + // Why extra slash in string literals? Axum is weird with URIs: + // /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND + // as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932 + // The cost of removing trailing slash is suprisingly hard: + // * Add tower dependency with NormalizePath layer + // * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377 + // * Rewrite make_service() -> into_make_service() + // * Rewrite oneshot() (not available for NormalizePath) + // I didn't manage to get it working correctly + let chain = vec![ + // create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty + (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), + (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents + (f(t2, "/3/5"), "PUT", "", StatusCode::OK, false), + (f(t2, "/3"), "DELETE", "", StatusCode::OK, false), + (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false), + // create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6 + (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), + (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false), + (f(t2, "/3"), "DELETE", "", StatusCode::OK, false), + (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t2, "/5/6"), "GET", "", StatusCode::OK, false), + // create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty + (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), + (f(t2, "/7/8"), "PUT", "", StatusCode::OK, false), + (f(t2, ""), "DELETE", "", StatusCode::OK, false), + (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false), + // create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9 + (f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), + (f(t2, "/5/6"), "PUT", "", StatusCode::OK, false), + (f(t3, "/8/9"), "PUT", "", StatusCode::OK, false), + (f(t2, "/3"), "DELETE", "", StatusCode::OK, false), + (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t2, "/5/6"), "GET", "", StatusCode::OK, false), + (f(t3, "/8/9"), "GET", "", StatusCode::OK, false), + // create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6 + (f(t4, "/5/6"), "PUT", "", StatusCode::OK, false), + (f(t2, ""), "DELETE", "", StatusCode::OK, false), + (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t3, "/8/9"), "GET", "", StatusCode::OK, false), + (f(t4, "/5/6"), "GET", "", StatusCode::OK, false), + // delete prefix 1 -> empty + (format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false), + (f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false), + (f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false), + ]; + requests_chain(chain.into_iter(), delete_prefix_token).await; + } +} diff --git a/object_storage/src/lib.rs b/object_storage/src/lib.rs new file mode 100644 index 0000000000..989afd4c25 --- /dev/null +++ b/object_storage/src/lib.rs @@ -0,0 +1,344 @@ +use anyhow::Result; +use axum::extract::{FromRequestParts, Path}; +use axum::response::{IntoResponse, Response}; +use axum::{RequestPartsExt, http::StatusCode, http::request::Parts}; +use axum_extra::TypedHeader; +use axum_extra::headers::{Authorization, authorization::Bearer}; +use camino::Utf8PathBuf; +use jsonwebtoken::{DecodingKey, Validation}; +use remote_storage::{GenericRemoteStorage, RemotePath}; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; +use std::result::Result as StdResult; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error}; +use utils::id::{TenantId, TimelineId}; + +// simplified version of utils::auth::JwtAuth +pub struct JwtAuth { + decoding_key: DecodingKey, + validation: Validation, +} + +pub const VALIDATION_ALGO: jsonwebtoken::Algorithm = jsonwebtoken::Algorithm::EdDSA; +impl JwtAuth { + pub fn new(key: &[u8]) -> Result { + Ok(Self { + decoding_key: DecodingKey::from_ed_pem(key)?, + validation: Validation::new(VALIDATION_ALGO), + }) + } + + pub fn decode(&self, token: &str) -> Result { + Ok(jsonwebtoken::decode(token, &self.decoding_key, &self.validation).map(|t| t.claims)?) + } +} + +fn normalize_key(key: &str) -> StdResult { + let key = clean_utf8(&Utf8PathBuf::from(key)); + if key.starts_with("..") || key == "." || key == "/" { + return Err(format!("invalid key {key}")); + } + match key.strip_prefix("/").map(Utf8PathBuf::from) { + Ok(p) => Ok(p), + _ => Ok(key), + } +} + +// Copied from path_clean crate with PathBuf->Utf8PathBuf +fn clean_utf8(path: &camino::Utf8Path) -> Utf8PathBuf { + use camino::Utf8Component as Comp; + let mut out = Vec::new(); + for comp in path.components() { + match comp { + Comp::CurDir => (), + Comp::ParentDir => match out.last() { + Some(Comp::RootDir) => (), + Some(Comp::Normal(_)) => { + out.pop(); + } + None | Some(Comp::CurDir) | Some(Comp::ParentDir) | Some(Comp::Prefix(_)) => { + out.push(comp) + } + }, + comp => out.push(comp), + } + } + if !out.is_empty() { + out.iter().collect() + } else { + Utf8PathBuf::from(".") + } +} + +pub struct Storage { + pub auth: JwtAuth, + pub storage: GenericRemoteStorage, + pub cancel: CancellationToken, + pub max_upload_file_limit: usize, +} + +pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc + +#[derive(Deserialize, Serialize, PartialEq)] +pub struct Claims { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub endpoint_id: EndpointId, + pub exp: u64, +} + +impl Display for Claims { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})", + self.tenant_id, self.timeline_id, self.endpoint_id, self.exp + ) + } +} + +#[derive(Deserialize, Serialize)] +struct KeyRequest { + tenant_id: TenantId, + timeline_id: TimelineId, + endpoint_id: EndpointId, + path: String, +} + +#[derive(Debug, PartialEq)] +pub struct S3Path { + pub path: RemotePath, +} + +impl TryFrom<&KeyRequest> for S3Path { + type Error = String; + fn try_from(req: &KeyRequest) -> StdResult { + let KeyRequest { + tenant_id, + timeline_id, + endpoint_id, + path, + } = &req; + let prefix = format!("{tenant_id}/{timeline_id}/{endpoint_id}",); + let path = Utf8PathBuf::from(prefix).join(normalize_key(path)?); + let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative + Ok(S3Path { path }) + } +} + +fn unauthorized(route: impl Display, claims: impl Display) -> Response { + debug!(%route, %claims, "route doesn't match claims"); + StatusCode::UNAUTHORIZED.into_response() +} + +pub fn bad_request(err: impl Display, desc: &'static str) -> Response { + debug!(%err, desc); + (StatusCode::BAD_REQUEST, err.to_string()).into_response() +} + +pub fn ok() -> Response { + StatusCode::OK.into_response() +} + +pub fn internal_error(err: impl Display, path: impl Display, desc: &'static str) -> Response { + error!(%err, %path, desc); + StatusCode::INTERNAL_SERVER_ERROR.into_response() +} + +pub fn not_found(key: impl ToString) -> Response { + (StatusCode::NOT_FOUND, key.to_string()).into_response() +} + +impl FromRequestParts> for S3Path { + type Rejection = Response; + async fn from_request_parts( + parts: &mut Parts, + state: &Arc, + ) -> Result { + let Path(path): Path = parts + .extract() + .await + .map_err(|e| bad_request(e, "invalid route"))?; + let TypedHeader(Authorization(bearer)) = parts + .extract::>>() + .await + .map_err(|e| bad_request(e, "invalid token"))?; + let claims: Claims = state + .auth + .decode(bearer.token()) + .map_err(|e| bad_request(e, "decoding token"))?; + let route = Claims { + tenant_id: path.tenant_id, + timeline_id: path.timeline_id, + endpoint_id: path.endpoint_id.clone(), + exp: claims.exp, + }; + if route != claims { + return Err(unauthorized(route, claims)); + } + (&path) + .try_into() + .map_err(|e| bad_request(e, "invalid route")) + } +} + +#[derive(Deserialize, Serialize, PartialEq)] +pub struct PrefixKeyPath { + pub tenant_id: TenantId, + pub timeline_id: Option, + pub endpoint_id: Option, +} + +impl Display for PrefixKeyPath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})", + self.tenant_id, + self.timeline_id + .as_ref() + .map(ToString::to_string) + .unwrap_or("".to_string()), + self.endpoint_id + .as_ref() + .map(ToString::to_string) + .unwrap_or("".to_string()) + ) + } +} + +#[derive(Debug, PartialEq)] +pub struct PrefixS3Path { + pub path: RemotePath, +} + +impl From<&PrefixKeyPath> for PrefixS3Path { + fn from(path: &PrefixKeyPath) -> Self { + let timeline_id = path + .timeline_id + .as_ref() + .map(ToString::to_string) + .unwrap_or("".to_string()); + let endpoint_id = path + .endpoint_id + .as_ref() + .map(ToString::to_string) + .unwrap_or("".to_string()); + let path = Utf8PathBuf::from(path.tenant_id.to_string()) + .join(timeline_id) + .join(endpoint_id); + let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative + PrefixS3Path { path } + } +} + +impl FromRequestParts> for PrefixS3Path { + type Rejection = Response; + async fn from_request_parts( + parts: &mut Parts, + state: &Arc, + ) -> Result { + let Path(path) = parts + .extract::>() + .await + .map_err(|e| bad_request(e, "invalid route"))?; + let TypedHeader(Authorization(bearer)) = parts + .extract::>>() + .await + .map_err(|e| bad_request(e, "invalid token"))?; + let claims: PrefixKeyPath = state + .auth + .decode(bearer.token()) + .map_err(|e| bad_request(e, "invalid token"))?; + if path != claims { + return Err(unauthorized(path, claims)); + } + Ok((&path).into()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn normalize_key() { + let f = super::normalize_key; + assert_eq!(f("hello/world/..").unwrap(), Utf8PathBuf::from("hello")); + assert_eq!( + f("ololo/1/../../not_ololo").unwrap(), + Utf8PathBuf::from("not_ololo") + ); + assert!(f("ololo/1/../../../").is_err()); + assert!(f(".").is_err()); + assert!(f("../").is_err()); + assert!(f("").is_err()); + assert_eq!(f("/1/2/3").unwrap(), Utf8PathBuf::from("1/2/3")); + assert!(f("/1/2/3/../../../").is_err()); + assert!(f("/1/2/3/../../../../").is_err()); + } + + const TENANT_ID: TenantId = + TenantId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]); + const TIMELINE_ID: TimelineId = + TimelineId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]); + const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg"; + + #[test] + fn s3_path() { + let auth = Claims { + tenant_id: TENANT_ID, + timeline_id: TIMELINE_ID, + endpoint_id: ENDPOINT_ID.into(), + exp: u64::MAX, + }; + let s3_path = |key| { + let path = &format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/{key}"); + let path = RemotePath::from_string(path).unwrap(); + S3Path { path } + }; + + let path = "cache_key".to_string(); + let mut key_path = KeyRequest { + path, + tenant_id: auth.tenant_id, + timeline_id: auth.timeline_id, + endpoint_id: auth.endpoint_id, + }; + assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path)); + + key_path.path = "we/can/have/nested/paths".to_string(); + assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path)); + + key_path.path = "../error/hello/../".to_string(); + assert!(S3Path::try_from(&key_path).is_err()); + } + + #[test] + fn prefix_s3_path() { + let mut path = PrefixKeyPath { + tenant_id: TENANT_ID, + timeline_id: None, + endpoint_id: None, + }; + let prefix_path = |s: String| RemotePath::from_string(&s).unwrap(); + assert_eq!( + PrefixS3Path::from(&path).path, + prefix_path(format!("{TENANT_ID}")) + ); + + path.timeline_id = Some(TIMELINE_ID); + assert_eq!( + PrefixS3Path::from(&path).path, + prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}")) + ); + + path.endpoint_id = Some(ENDPOINT_ID.into()); + assert_eq!( + PrefixS3Path::from(&path).path, + prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}")) + ); + } +} diff --git a/object_storage/src/main.rs b/object_storage/src/main.rs new file mode 100644 index 0000000000..40325db19d --- /dev/null +++ b/object_storage/src/main.rs @@ -0,0 +1,65 @@ +//! `object_storage` is a service which provides API for uploading and downloading +//! files. It is used by compute and control plane for accessing LFC prewarm data. +//! This service is deployed either as a separate component or as part of compute image +//! for large computes. +mod app; +use anyhow::Context; +use tracing::info; +use utils::logging; + +//see set() +const fn max_upload_file_limit() -> usize { + 100 * 1024 * 1024 +} + +#[derive(serde::Deserialize)] +#[serde(tag = "type")] +struct Config { + listen: std::net::SocketAddr, + pemfile: camino::Utf8PathBuf, + #[serde(flatten)] + storage_config: remote_storage::RemoteStorageConfig, + #[serde(default = "max_upload_file_limit")] + max_upload_file_limit: usize, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::EnableWithRustLogFilter, + logging::Output::Stdout, + )?; + + let config: String = std::env::args().skip(1).take(1).collect(); + if config.is_empty() { + anyhow::bail!("Usage: object_storage config.json") + } + info!("Reading config from {config}"); + let config = std::fs::read_to_string(config.clone())?; + let config: Config = serde_json::from_str(&config).context("parsing config")?; + info!("Reading pemfile from {}", config.pemfile.clone()); + let pemfile = std::fs::read(config.pemfile.clone())?; + info!("Loading public key from {}", config.pemfile.clone()); + let auth = object_storage::JwtAuth::new(&pemfile)?; + + let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap(); + info!("listening on {}", listener.local_addr().unwrap()); + + let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?; + let cancel = tokio_util::sync::CancellationToken::new(); + app::check_storage_permissions(&storage, cancel.clone()).await?; + + let proxy = std::sync::Arc::new(object_storage::Storage { + auth, + storage, + cancel: cancel.clone(), + max_upload_file_limit: config.max_upload_file_limit, + }); + + tokio::spawn(utils::signals::signal_handler(cancel.clone())); + axum::serve(listener, app::app(proxy)) + .with_graceful_shutdown(async move { cancel.cancelled().await }) + .await?; + Ok(()) +} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9a8494292d..54fecee588 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -31,7 +31,6 @@ use pageserver::{ }; use postgres_backend::AuthType; use remote_storage::GenericRemoteStorage; -use tokio::signal::unix::SignalKind; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::*; @@ -744,32 +743,7 @@ fn start_pageserver( let signal_token = CancellationToken::new(); let signal_cancel = signal_token.child_token(); - // Spawn signal handlers. Runs in a loop since we want to be responsive to multiple signals - // even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown). See: - // https://github.com/neondatabase/neon/issues/9740. - tokio::spawn(async move { - let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap(); - let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap(); - let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap(); - - loop { - let signal = tokio::select! { - _ = sigquit.recv() => { - info!("Got signal SIGQUIT. Terminating in immediate shutdown mode."); - std::process::exit(111); - } - _ = sigint.recv() => "SIGINT", - _ = sigterm.recv() => "SIGTERM", - }; - - if !signal_token.is_cancelled() { - info!("Got signal {signal}. Terminating gracefully in fast shutdown mode."); - signal_token.cancel(); - } else { - info!("Got signal {signal}. Already shutting down."); - } - } - }); + tokio::spawn(utils::signals::signal_handler(signal_token)); // Wait for cancellation signal and shut down the pageserver. // diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index d555ee2989..5f5626fb98 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -417,6 +417,19 @@ class NeonLocalCli(AbstractNeonCli): cmd.append(f"--instance-id={instance_id}") return self.raw_cli(cmd) + def object_storage_start(self, timeout_in_seconds: int | None = None): + cmd = ["object-storage", "start"] + if timeout_in_seconds is not None: + cmd.append(f"--start-timeout={timeout_in_seconds}s") + return self.raw_cli(cmd) + + def object_storage_stop(self, immediate: bool): + cmd = ["object-storage", "stop"] + if immediate: + cmd.extend(["-m", "immediate"]) + return self.raw_cli(cmd) + pass + def pageserver_start( self, id: int, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5694bf170e..d000dcb69f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1023,6 +1023,8 @@ class NeonEnvBuilder: self.env.broker.assert_no_errors() + self.env.object_storage.assert_no_errors() + try: self.overlay_cleanup_teardown() except Exception as e: @@ -1118,6 +1120,8 @@ class NeonEnv: pagectl_env_vars["RUST_LOG"] = self.rust_log_override self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath) + self.object_storage = ObjectStorage(self) + # The URL for the pageserver to use as its control_plane_api config if config.storage_controller_port_override is not None: log.info( @@ -1173,6 +1177,7 @@ class NeonEnv: }, "safekeepers": [], "pageservers": [], + "object_storage": {"port": self.port_distributor.get_port()}, "generate_local_ssl_certs": self.generate_local_ssl_certs, } @@ -1408,6 +1413,8 @@ class NeonEnv: self.storage_controller.on_safekeeper_deploy(sk_id, body) self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active") + self.object_storage.start(timeout_in_seconds=timeout_in_seconds) + def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True): """ After this method returns, there should be no child processes running. @@ -1425,6 +1432,8 @@ class NeonEnv: except Exception as e: raise_later = e + self.object_storage.stop(immediate=immediate) + # Stop storage controller before pageservers: we don't want it to spuriously # detect a pageserver "failure" during test teardown self.storage_controller.stop(immediate=immediate) @@ -2635,6 +2644,26 @@ class NeonStorageController(MetricsGetter, LogUtils): self.stop(immediate=True) +class ObjectStorage(LogUtils): + def __init__(self, env: NeonEnv): + service_dir = env.repo_dir / "object_storage" + super().__init__(logfile=service_dir / "object_storage.log") + self.conf_path = service_dir / "object_storage.json" + self.env = env + + def base_url(self): + return json.loads(self.conf_path.read_text())["listen"] + + def start(self, timeout_in_seconds: int | None = None): + self.env.neon_cli.object_storage_start(timeout_in_seconds) + + def stop(self, immediate: bool = False): + self.env.neon_cli.object_storage_stop(immediate) + + def assert_no_errors(self): + assert_no_errors(self.logfile, "object_storage", []) + + class NeonProxiedStorageController(NeonStorageController): def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool, use_https: bool): super().__init__(env, proxy_port, auth_enabled, use_https) diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index 8bd0662ef8..e6bcdf8e67 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -134,10 +134,11 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): """ env = neon_env_builder.init_start() - # Stop default ps/sk + # Stop default services env.neon_cli.pageserver_stop(env.pageserver.id) env.neon_cli.safekeeper_stop() env.neon_cli.storage_controller_stop(False) + env.neon_cli.object_storage_stop(False) env.neon_cli.storage_broker_stop() # Keep NeonEnv state up to date, it usually owns starting/stopping services @@ -179,11 +180,13 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder): # Using the single-pageserver shortcut property throws when there are multiple pageservers with pytest.raises(AssertionError): - _drop = env.pageserver + _ = env.pageserver env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1) env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2) + env.neon_cli.object_storage_stop(False) + # Stop this to get out of the way of the following `start` env.neon_cli.storage_controller_stop(False) env.neon_cli.storage_broker_stop() diff --git a/test_runner/regress/test_object_storage.py b/test_runner/regress/test_object_storage.py new file mode 100644 index 0000000000..0b1cfa344f --- /dev/null +++ b/test_runner/regress/test_object_storage.py @@ -0,0 +1,56 @@ +from time import time + +import pytest +from aiohttp import ClientSession +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv +from jwcrypto import jwk, jwt + + +@pytest.mark.asyncio +async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv): + """ + Inserts, retrieves, and deletes test file using a JWT token + """ + env = neon_simple_env + ep = env.endpoints.create_start(branch_name="main") + tenant_id = str(ep.tenant_id) + timeline_id = str(ep.show_timeline_id()) + endpoint_id = ep.endpoint_id + + key_path = env.repo_dir / "auth_private_key.pem" + key = jwk.JWK.from_pem(key_path.read_bytes()) + claims = { + "tenant_id": tenant_id, + "timeline_id": timeline_id, + "endpoint_id": endpoint_id, + "exp": round(time()) + 99, + } + log.info(f"key path {key_path}\nclaims {claims}") + token = jwt.JWT(header={"alg": "EdDSA"}, claims=claims) + token.make_signed_token(key) + token = token.serialize() + + base_url = env.object_storage.base_url() + key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key" + headers = {"Authorization": f"Bearer {token}"} + log.info(f"cache key url {key}") + log.info(f"token {token}") + + async with ClientSession(headers=headers) as session: + async with session.get(key) as res: + assert res.status == 404, f"Non-existing file is present: {res}" + + data = b"cheburash" + async with session.put(key, data=data) as res: + assert res.status == 200, f"Error writing file: {res}" + + async with session.get(key) as res: + read_data = await res.read() + assert data == read_data + + async with session.delete(key) as res: + assert res.status == 200, f"Error removing file {res}" + + async with session.get(key) as res: + assert res.status == 404, f"File was not deleted: {res}" diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 702f4eeccf..0175794a57 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -95,6 +95,7 @@ def test_storage_controller_smoke( env.pageservers[1].start() for sk in env.safekeepers: sk.start() + env.object_storage.start() # The pageservers we started should have registered with the sharding service on startup nodes = env.storage_controller.node_list() @@ -346,6 +347,7 @@ def prepare_onboarding_env( env = neon_env_builder.init_configs() env.broker.start() env.storage_controller.start() + env.object_storage.start() # This is the pageserver where we'll initially create the tenant. Run it in emergency # mode so that it doesn't talk to storage controller, and do not register it.