diff --git a/.dockerignore b/.dockerignore index ffa72eaf51..4bf1492ea3 100644 --- a/.dockerignore +++ b/.dockerignore @@ -19,7 +19,7 @@ !pageserver/ !pgxn/ !proxy/ -!object_storage/ +!endpoint_storage/ !storage_scrubber/ !safekeeper/ !storage_broker/ diff --git a/Cargo.lock b/Cargo.lock index af5c271686..dd8ea475df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2037,6 +2037,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "endpoint_storage" +version = "0.0.1" +dependencies = [ + "anyhow", + "axum", + "axum-extra", + "camino", + "camino-tempfile", + "futures", + "http-body-util", + "itertools 0.10.5", + "jsonwebtoken", + "prometheus", + "rand 0.8.5", + "remote_storage", + "serde", + "serde_json", + "test-log", + "tokio", + "tokio-util", + "tower 0.5.2", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "enum-map" version = "2.5.0" @@ -3998,33 +4025,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "object_storage" -version = "0.0.1" -dependencies = [ - "anyhow", - "axum", - "axum-extra", - "camino", - "camino-tempfile", - "futures", - "http-body-util", - "itertools 0.10.5", - "jsonwebtoken", - "prometheus", - "rand 0.8.5", - "remote_storage", - "serde", - "serde_json", - "test-log", - "tokio", - "tokio-util", - "tower 0.5.2", - "tracing", - "utils", - "workspace_hack", -] - [[package]] name = "once_cell" version = "1.20.2" diff --git a/Cargo.toml b/Cargo.toml index 9d7904a787..850e04ff2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ members = [ "libs/proxy/postgres-protocol2", "libs/proxy/postgres-types2", "libs/proxy/tokio-postgres2", - "object_storage", + "endpoint_storage", ] [workspace.package] diff --git a/Dockerfile b/Dockerfile index 848bfab921..3b7962dcf9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -89,7 +89,7 @@ RUN set -e \ --bin storage_broker \ --bin storage_controller \ --bin proxy \ - --bin object_storage \ + --bin endpoint_storage \ --bin neon_local \ --bin storage_scrubber \ --locked --release @@ -122,7 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin -COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin +COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index af0504b957..5cf6767361 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -18,12 +18,11 @@ use anyhow::{Context, Result, anyhow, bail}; use clap::Parser; use compute_api::spec::ComputeMode; use control_plane::endpoint::ComputeControlPlane; +use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage}; use control_plane::local_env::{ - InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf, - ObjectStorageConf, SafekeeperConf, + EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, + NeonLocalInitPageserverConf, SafekeeperConf, }; -use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT; -use control_plane::object_storage::ObjectStorage; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; use control_plane::storage_controller::{ @@ -93,7 +92,7 @@ enum NeonLocalCmd { #[command(subcommand)] Safekeeper(SafekeeperCmd), #[command(subcommand)] - ObjectStorage(ObjectStorageCmd), + EndpointStorage(EndpointStorageCmd), #[command(subcommand)] Endpoint(EndpointCmd), #[command(subcommand)] @@ -460,14 +459,14 @@ enum SafekeeperCmd { #[derive(clap::Subcommand)] #[clap(about = "Manage object storage")] -enum ObjectStorageCmd { - Start(ObjectStorageStartCmd), - Stop(ObjectStorageStopCmd), +enum EndpointStorageCmd { + Start(EndpointStorageStartCmd), + Stop(EndpointStorageStopCmd), } #[derive(clap::Args)] #[clap(about = "Start object storage")] -struct ObjectStorageStartCmd { +struct EndpointStorageStartCmd { #[clap(short = 't', long, help = "timeout until we fail the command")] #[arg(default_value = "10s")] start_timeout: humantime::Duration, @@ -475,7 +474,7 @@ struct ObjectStorageStartCmd { #[derive(clap::Args)] #[clap(about = "Stop object storage")] -struct ObjectStorageStopCmd { +struct EndpointStorageStopCmd { #[arg(value_enum, default_value = "fast")] #[clap( short = 'm', @@ -797,7 +796,9 @@ fn main() -> Result<()> { } NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)), NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)), - NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)), + NeonLocalCmd::EndpointStorage(subcmd) => { + rt.block_on(handle_endpoint_storage(&subcmd, env)) + } NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)), NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env), }; @@ -1014,8 +1015,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { } }) .collect(), - object_storage: ObjectStorageConf { - port: OBJECT_STORAGE_DEFAULT_PORT, + endpoint_storage: EndpointStorageConf { + port: ENDPOINT_STORAGE_DEFAULT_PORT, }, pg_distrib_dir: None, neon_distrib_dir: None, @@ -1735,12 +1736,15 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Ok(()) } -async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> { - use ObjectStorageCmd::*; - let storage = ObjectStorage::from_env(env); +async fn handle_endpoint_storage( + subcmd: &EndpointStorageCmd, + env: &local_env::LocalEnv, +) -> Result<()> { + use EndpointStorageCmd::*; + let storage = EndpointStorage::from_env(env); // In tests like test_forward_compatibility or test_graceful_cluster_restart - // old neon binaries (without object_storage) are present + // old neon binaries (without endpoint_storage) are present if !storage.bin.exists() { eprintln!( "{} binary not found. Ignore if this is a compatibility test", @@ -1750,13 +1754,13 @@ async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::Local } match subcmd { - Start(ObjectStorageStartCmd { start_timeout }) => { + Start(EndpointStorageStartCmd { start_timeout }) => { if let Err(e) = storage.start(start_timeout).await { - eprintln!("object_storage start failed: {e}"); + eprintln!("endpoint_storage start failed: {e}"); exit(1); } } - Stop(ObjectStorageStopCmd { stop_mode }) => { + Stop(EndpointStorageStopCmd { stop_mode }) => { let immediate = match stop_mode { StopMode::Fast => false, StopMode::Immediate => true, @@ -1866,10 +1870,10 @@ async fn handle_start_all_impl( } js.spawn(async move { - ObjectStorage::from_env(env) + EndpointStorage::from_env(env) .start(&retry_timeout) .await - .map_err(|e| e.context("start object_storage")) + .map_err(|e| e.context("start endpoint_storage")) }); })(); @@ -1968,9 +1972,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } } - let storage = ObjectStorage::from_env(env); + let storage = EndpointStorage::from_env(env); if let Err(e) = storage.stop(immediate) { - eprintln!("object_storage stop failed: {:#}", e); + eprintln!("endpoint_storage stop failed: {:#}", e); } for ps_conf in &env.pageservers { diff --git a/control_plane/src/object_storage.rs b/control_plane/src/endpoint_storage.rs similarity index 63% rename from control_plane/src/object_storage.rs rename to control_plane/src/endpoint_storage.rs index 1a595b7809..102db91a22 100644 --- a/control_plane/src/object_storage.rs +++ b/control_plane/src/endpoint_storage.rs @@ -1,34 +1,33 @@ use crate::background_process::{self, start_process, stop_process}; use crate::local_env::LocalEnv; -use anyhow::anyhow; use anyhow::{Context, Result}; use camino::Utf8PathBuf; use std::io::Write; use std::time::Duration; /// Directory within .neon which will be used by default for LocalFs remote storage. -pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage"; -pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993; +pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage"; +pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993; -pub struct ObjectStorage { +pub struct EndpointStorage { pub bin: Utf8PathBuf, pub data_dir: Utf8PathBuf, pub pemfile: Utf8PathBuf, pub port: u16, } -impl ObjectStorage { - pub fn from_env(env: &LocalEnv) -> ObjectStorage { - ObjectStorage { - bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(), - data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(), +impl EndpointStorage { + pub fn from_env(env: &LocalEnv) -> EndpointStorage { + EndpointStorage { + bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(), + data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(), pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(), - port: env.object_storage.port, + port: env.endpoint_storage.port, } } fn config_path(&self) -> Utf8PathBuf { - self.data_dir.join("object_storage.json") + self.data_dir.join("endpoint_storage.json") } fn listen_addr(&self) -> Utf8PathBuf { @@ -49,7 +48,7 @@ impl ObjectStorage { let cfg = Cfg { listen: self.listen_addr(), pemfile: parent.join(self.pemfile.clone()), - local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR), + local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR), r#type: "LocalFs".to_string(), }; std::fs::create_dir_all(self.config_path().parent().unwrap())?; @@ -59,24 +58,19 @@ impl ObjectStorage { } pub async fn start(&self, retry_timeout: &Duration) -> Result<()> { - println!("Starting s3 proxy at {}", self.listen_addr()); + println!("Starting endpoint_storage at {}", self.listen_addr()); std::io::stdout().flush().context("flush stdout")?; let process_status_check = || async { - tokio::time::sleep(Duration::from_millis(500)).await; - let res = reqwest::Client::new() - .get(format!("http://{}/metrics", self.listen_addr())) - .send() - .await; - match res { - Ok(response) if response.status().is_success() => Ok(true), - Ok(_) => Err(anyhow!("Failed to query /metrics")), - Err(e) => Err(anyhow!("Failed to check node status: {e}")), + let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr())); + match res.send().await { + Ok(res) => Ok(res.status().is_success()), + Err(_) => Ok(false), } }; let res = start_process( - "object_storage", + "endpoint_storage", &self.data_dir.clone().into_std_path_buf(), &self.bin.clone().into_std_path_buf(), vec![self.config_path().to_string()], @@ -94,14 +88,14 @@ impl ObjectStorage { } pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { - stop_process(immediate, "object_storage", &self.pid_file()) + stop_process(immediate, "endpoint_storage", &self.pid_file()) } fn log_file(&self) -> Utf8PathBuf { - self.data_dir.join("object_storage.log") + self.data_dir.join("endpoint_storage.log") } fn pid_file(&self) -> Utf8PathBuf { - self.data_dir.join("object_storage.pid") + self.data_dir.join("endpoint_storage.pid") } } diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 2d9fe2c807..4619bc0f13 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -9,8 +9,8 @@ mod background_process; pub mod broker; pub mod endpoint; +pub mod endpoint_storage; pub mod local_env; -pub mod object_storage; pub mod pageserver; pub mod postgresql_conf; pub mod safekeeper; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 5e3cf95a31..77d5c1c922 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; use utils::auth::encode_from_key_file; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; -use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage}; +use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage}; use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode}; use crate::safekeeper::SafekeeperNode; @@ -72,7 +72,7 @@ pub struct LocalEnv { pub safekeepers: Vec, - pub object_storage: ObjectStorageConf, + pub endpoint_storage: EndpointStorageConf, // Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will // be propagated into each pageserver's configuration. @@ -110,7 +110,7 @@ pub struct OnDiskConfig { )] pub pageservers: Vec, pub safekeepers: Vec, - pub object_storage: ObjectStorageConf, + pub endpoint_storage: EndpointStorageConf, pub control_plane_api: Option, pub control_plane_hooks_api: Option, pub control_plane_compute_hook_api: Option, @@ -144,7 +144,7 @@ pub struct NeonLocalInitConf { pub storage_controller: Option, pub pageservers: Vec, pub safekeepers: Vec, - pub object_storage: ObjectStorageConf, + pub endpoint_storage: EndpointStorageConf, pub control_plane_api: Option, pub control_plane_hooks_api: Option, pub generate_local_ssl_certs: bool, @@ -152,7 +152,7 @@ pub struct NeonLocalInitConf { #[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)] #[serde(default)] -pub struct ObjectStorageConf { +pub struct EndpointStorageConf { pub port: u16, } @@ -413,8 +413,8 @@ impl LocalEnv { self.pg_dir(pg_version, "lib") } - pub fn object_storage_bin(&self) -> PathBuf { - self.neon_distrib_dir.join("object_storage") + pub fn endpoint_storage_bin(&self) -> PathBuf { + self.neon_distrib_dir.join("endpoint_storage") } pub fn pageserver_bin(&self) -> PathBuf { @@ -450,8 +450,8 @@ impl LocalEnv { self.base_data_dir.join("safekeepers").join(data_dir_name) } - pub fn object_storage_data_dir(&self) -> PathBuf { - self.base_data_dir.join("object_storage") + pub fn endpoint_storage_data_dir(&self) -> PathBuf { + self.base_data_dir.join("endpoint_storage") } pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> { @@ -615,7 +615,7 @@ impl LocalEnv { control_plane_compute_hook_api: _, branch_name_mappings, generate_local_ssl_certs, - object_storage, + endpoint_storage, } = on_disk_config; LocalEnv { base_data_dir: repopath.to_owned(), @@ -632,7 +632,7 @@ impl LocalEnv { control_plane_hooks_api, branch_name_mappings, generate_local_ssl_certs, - object_storage, + endpoint_storage, } }; @@ -742,7 +742,7 @@ impl LocalEnv { control_plane_compute_hook_api: None, branch_name_mappings: self.branch_name_mappings.clone(), generate_local_ssl_certs: self.generate_local_ssl_certs, - object_storage: self.object_storage.clone(), + endpoint_storage: self.endpoint_storage.clone(), }, ) } @@ -849,7 +849,7 @@ impl LocalEnv { control_plane_api, generate_local_ssl_certs, control_plane_hooks_api, - object_storage, + endpoint_storage, } = conf; // Find postgres binaries. @@ -901,7 +901,7 @@ impl LocalEnv { control_plane_hooks_api, branch_name_mappings: Default::default(), generate_local_ssl_certs, - object_storage, + endpoint_storage, }; if generate_local_ssl_certs { @@ -929,13 +929,13 @@ impl LocalEnv { .context("pageserver init failed")?; } - ObjectStorage::from_env(&env) + EndpointStorage::from_env(&env) .init() .context("object storage init failed")?; // setup remote remote location for default LocalFs remote storage std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?; - std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?; + std::fs::create_dir_all(env.base_data_dir.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR))?; env.persist_config() } diff --git a/object_storage/Cargo.toml b/endpoint_storage/Cargo.toml similarity index 96% rename from object_storage/Cargo.toml rename to endpoint_storage/Cargo.toml index 17fbaefe6f..b2c9d51551 100644 --- a/object_storage/Cargo.toml +++ b/endpoint_storage/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "object_storage" +name = "endpoint_storage" version = "0.0.1" edition.workspace = true license.workspace = true diff --git a/object_storage/src/app.rs b/endpoint_storage/src/app.rs similarity index 94% rename from object_storage/src/app.rs rename to endpoint_storage/src/app.rs index 7b5627f0db..f07ef06328 100644 --- a/object_storage/src/app.rs +++ b/endpoint_storage/src/app.rs @@ -2,7 +2,7 @@ use anyhow::anyhow; use axum::body::{Body, Bytes}; use axum::response::{IntoResponse, Response}; use axum::{Router, http::StatusCode}; -use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok}; +use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok}; use remote_storage::TimeoutOrCancel; use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath}; use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH}; @@ -46,12 +46,12 @@ async fn metrics() -> Result { async fn get(S3Path { path }: S3Path, state: State) -> Result { info!(%path, "downloading"); - let download_err = |e| { - if let DownloadError::NotFound = e { - info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service + let download_err = |err| { + if let DownloadError::NotFound = err { + info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service return not_found(&path); } - internal_error(e, &path, "downloading") + internal_error(err, &path, "downloading") }; let cancel = state.cancel.clone(); let opts = &DownloadOpts::default(); @@ -249,7 +249,7 @@ mod tests { }; let proxy = Storage { - auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(), + auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(), storage, cancel: cancel.clone(), max_upload_file_limit: usize::MAX, @@ -343,14 +343,14 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]); const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg"; fn token() -> String { - let claims = object_storage::Claims { + let claims = endpoint_storage::Claims { tenant_id: TENANT_ID, timeline_id: TIMELINE_ID, endpoint_id: ENDPOINT_ID.into(), exp: u64::MAX, }; let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap(); - let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO); + let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO); jsonwebtoken::encode(&header, &claims, &key).unwrap() } @@ -364,7 +364,10 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()], vec![ENDPOINT_ID, "ep-ololo"] ) - .skip(1); + // first one is fully valid path, second path is valid for GET as + // read paths may have different endpoint if tenant and timeline matches + // (needed for prewarming RO->RW replica) + .skip(2); for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) { info!(%uri, %method, %tenant, %timeline, %endpoint); @@ -475,6 +478,16 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH requests_chain(chain.into_iter(), |_| token()).await; } + #[testlog(tokio::test)] + async fn read_other_endpoint_data() { + let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key"); + let chain = vec![ + (uri.clone(), "GET", "", StatusCode::NOT_FOUND, false), + (uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false), + ]; + requests_chain(chain.into_iter(), |_| token()).await; + } + fn delete_prefix_token(uri: &str) -> String { use serde::Serialize; let parts = uri.split("/").collect::>(); @@ -482,7 +495,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH struct PrefixClaims { tenant_id: TenantId, timeline_id: Option, - endpoint_id: Option, + endpoint_id: Option, exp: u64, } let claims = PrefixClaims { @@ -492,7 +505,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH exp: u64::MAX, }; let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap(); - let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO); + let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO); jsonwebtoken::encode(&header, &claims, &key).unwrap() } diff --git a/object_storage/src/lib.rs b/endpoint_storage/src/lib.rs similarity index 96% rename from object_storage/src/lib.rs rename to endpoint_storage/src/lib.rs index 989afd4c25..eb6b80c487 100644 --- a/object_storage/src/lib.rs +++ b/endpoint_storage/src/lib.rs @@ -169,10 +169,19 @@ impl FromRequestParts> for S3Path { .auth .decode(bearer.token()) .map_err(|e| bad_request(e, "decoding token"))?; + + // Read paths may have different endpoint ids. For readonly -> readwrite replica + // prewarming, endpoint must read other endpoint's data. + let endpoint_id = if parts.method == axum::http::Method::GET { + claims.endpoint_id.clone() + } else { + path.endpoint_id.clone() + }; + let route = Claims { tenant_id: path.tenant_id, timeline_id: path.timeline_id, - endpoint_id: path.endpoint_id.clone(), + endpoint_id, exp: claims.exp, }; if route != claims { diff --git a/object_storage/src/main.rs b/endpoint_storage/src/main.rs similarity index 88% rename from object_storage/src/main.rs rename to endpoint_storage/src/main.rs index 40325db19d..3d1f05575d 100644 --- a/object_storage/src/main.rs +++ b/endpoint_storage/src/main.rs @@ -1,4 +1,4 @@ -//! `object_storage` is a service which provides API for uploading and downloading +//! `endpoint_storage` is a service which provides API for uploading and downloading //! files. It is used by compute and control plane for accessing LFC prewarm data. //! This service is deployed either as a separate component or as part of compute image //! for large computes. @@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> { let config: String = std::env::args().skip(1).take(1).collect(); if config.is_empty() { - anyhow::bail!("Usage: object_storage config.json") + anyhow::bail!("Usage: endpoint_storage config.json") } info!("Reading config from {config}"); let config = std::fs::read_to_string(config.clone())?; @@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> { info!("Reading pemfile from {}", config.pemfile.clone()); let pemfile = std::fs::read(config.pemfile.clone())?; info!("Loading public key from {}", config.pemfile.clone()); - let auth = object_storage::JwtAuth::new(&pemfile)?; + let auth = endpoint_storage::JwtAuth::new(&pemfile)?; let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap(); info!("listening on {}", listener.local_addr().unwrap()); @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { let cancel = tokio_util::sync::CancellationToken::new(); app::check_storage_permissions(&storage, cancel.clone()).await?; - let proxy = std::sync::Arc::new(object_storage::Storage { + let proxy = std::sync::Arc::new(endpoint_storage::Storage { auth, storage, cancel: cancel.clone(), diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 80852b610b..b5d69b5ab6 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -417,14 +417,14 @@ class NeonLocalCli(AbstractNeonCli): cmd.append(f"--instance-id={instance_id}") return self.raw_cli(cmd) - def object_storage_start(self, timeout_in_seconds: int | None = None): - cmd = ["object-storage", "start"] + def endpoint_storage_start(self, timeout_in_seconds: int | None = None): + cmd = ["endpoint-storage", "start"] if timeout_in_seconds is not None: cmd.append(f"--start-timeout={timeout_in_seconds}s") return self.raw_cli(cmd) - def object_storage_stop(self, immediate: bool): - cmd = ["object-storage", "stop"] + def endpoint_storage_stop(self, immediate: bool): + cmd = ["endpoint-storage", "stop"] if immediate: cmd.extend(["-m", "immediate"]) return self.raw_cli(cmd) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index db2b68d082..546b954486 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1029,7 +1029,7 @@ class NeonEnvBuilder: self.env.broker.assert_no_errors() - self.env.object_storage.assert_no_errors() + self.env.endpoint_storage.assert_no_errors() try: self.overlay_cleanup_teardown() @@ -1126,7 +1126,7 @@ class NeonEnv: pagectl_env_vars["RUST_LOG"] = self.rust_log_override self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath) - self.object_storage = ObjectStorage(self) + self.endpoint_storage = EndpointStorage(self) # The URL for the pageserver to use as its control_plane_api config if config.storage_controller_port_override is not None: @@ -1183,7 +1183,7 @@ class NeonEnv: }, "safekeepers": [], "pageservers": [], - "object_storage": {"port": self.port_distributor.get_port()}, + "endpoint_storage": {"port": self.port_distributor.get_port()}, "generate_local_ssl_certs": self.generate_local_ssl_certs, } @@ -1420,7 +1420,7 @@ class NeonEnv: self.storage_controller.on_safekeeper_deploy(sk_id, body) self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active") - self.object_storage.start(timeout_in_seconds=timeout_in_seconds) + self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds) def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True): """ @@ -1439,7 +1439,7 @@ class NeonEnv: except Exception as e: raise_later = e - self.object_storage.stop(immediate=immediate) + self.endpoint_storage.stop(immediate=immediate) # Stop storage controller before pageservers: we don't want it to spuriously # detect a pageserver "failure" during test teardown @@ -2660,24 +2660,24 @@ class NeonStorageController(MetricsGetter, LogUtils): self.stop(immediate=True) -class ObjectStorage(LogUtils): +class EndpointStorage(LogUtils): def __init__(self, env: NeonEnv): - service_dir = env.repo_dir / "object_storage" - super().__init__(logfile=service_dir / "object_storage.log") - self.conf_path = service_dir / "object_storage.json" + service_dir = env.repo_dir / "endpoint_storage" + super().__init__(logfile=service_dir / "endpoint_storage.log") + self.conf_path = service_dir / "endpoint_storage.json" self.env = env def base_url(self): return json.loads(self.conf_path.read_text())["listen"] def start(self, timeout_in_seconds: int | None = None): - self.env.neon_cli.object_storage_start(timeout_in_seconds) + self.env.neon_cli.endpoint_storage_start(timeout_in_seconds) def stop(self, immediate: bool = False): - self.env.neon_cli.object_storage_stop(immediate) + self.env.neon_cli.endpoint_storage_stop(immediate) def assert_no_errors(self): - assert_no_errors(self.logfile, "object_storage", []) + assert_no_errors(self.logfile, "endpoint_storage", []) class NeonProxiedStorageController(NeonStorageController): diff --git a/test_runner/regress/test_object_storage.py b/test_runner/regress/test_endpoint_storage.py similarity index 93% rename from test_runner/regress/test_object_storage.py rename to test_runner/regress/test_endpoint_storage.py index 0b1cfa344f..04029114ec 100644 --- a/test_runner/regress/test_object_storage.py +++ b/test_runner/regress/test_endpoint_storage.py @@ -8,7 +8,7 @@ from jwcrypto import jwk, jwt @pytest.mark.asyncio -async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv): +async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv): """ Inserts, retrieves, and deletes test file using a JWT token """ @@ -31,7 +31,7 @@ async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv): token.make_signed_token(key) token = token.serialize() - base_url = env.object_storage.base_url() + base_url = env.endpoint_storage.base_url() key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key" headers = {"Authorization": f"Bearer {token}"} log.info(f"cache key url {key}") diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index e6bcdf8e67..51a38d9f14 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -138,7 +138,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): env.neon_cli.pageserver_stop(env.pageserver.id) env.neon_cli.safekeeper_stop() env.neon_cli.storage_controller_stop(False) - env.neon_cli.object_storage_stop(False) + env.neon_cli.endpoint_storage_stop(False) env.neon_cli.storage_broker_stop() # Keep NeonEnv state up to date, it usually owns starting/stopping services @@ -185,7 +185,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder): env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1) env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2) - env.neon_cli.object_storage_stop(False) + env.neon_cli.endpoint_storage_stop(False) # Stop this to get out of the way of the following `start` env.neon_cli.storage_controller_stop(False) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 0f291030fe..a0fa8b55f3 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -95,7 +95,7 @@ def test_storage_controller_smoke( env.pageservers[1].start() for sk in env.safekeepers: sk.start() - env.object_storage.start() + env.endpoint_storage.start() # The pageservers we started should have registered with the sharding service on startup nodes = env.storage_controller.node_list() @@ -347,7 +347,7 @@ def prepare_onboarding_env( env = neon_env_builder.init_configs() env.broker.start() env.storage_controller.start() - env.object_storage.start() + env.endpoint_storage.start() # This is the pageserver where we'll initially create the tenant. Run it in emergency # mode so that it doesn't talk to storage controller, and do not register it.