mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Object storage proxy (#11357)
Service targeted for storing and retrieving LFC prewarm data. Can be used for proxying S3 access for Postgres extensions like pg_mooncake as well. Requests must include a Bearer JWT token. Token is validated using a pemfile (should be passed in infra/). Note: app is not tolerant to extra trailing slashes, see app.rs `delete_prefix` test for comments. Resolves: https://github.com/neondatabase/cloud/issues/26342 Unrelated changes: gate a `rename_noreplace` feature and disable it in `remote_storage` so as `object_storage` can be built with musl
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
!pageserver/
|
||||
!pgxn/
|
||||
!proxy/
|
||||
!object_storage/
|
||||
!storage_scrubber/
|
||||
!safekeeper/
|
||||
!storage_broker/
|
||||
|
||||
55
Cargo.lock
generated
55
Cargo.lock
generated
@@ -3991,6 +3991,33 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object_storage"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum-extra",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
"futures",
|
||||
"http-body-util",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"test-log",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower 0.5.2",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.20.2"
|
||||
@@ -4693,7 +4720,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.6"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"byteorder",
|
||||
@@ -4727,7 +4754,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.6"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -6925,6 +6952,28 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "test-log"
|
||||
version = "0.2.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7f46083d221181166e5b6f6b1e5f1d499f3a76888826e6cb1d057554157cd0f"
|
||||
dependencies = [
|
||||
"env_logger",
|
||||
"test-log-macros",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "test-log-macros"
|
||||
version = "0.2.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "888d0c3c6db53c0fdab160d2ed5e12ba745383d3e85813f2ea0f2b1475ab553f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.69"
|
||||
@@ -7172,7 +7221,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.10"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
@@ -40,6 +40,7 @@ members = [
|
||||
"libs/proxy/postgres-protocol2",
|
||||
"libs/proxy/postgres-types2",
|
||||
"libs/proxy/tokio-postgres2",
|
||||
"object_storage",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -208,6 +209,7 @@ tracing-opentelemetry = "0.28"
|
||||
tracing-serde = "0.2.0"
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
|
||||
try-lock = "0.2.5"
|
||||
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
typed-json = "0.1"
|
||||
url = "2.2"
|
||||
|
||||
@@ -89,6 +89,7 @@ RUN set -e \
|
||||
--bin storage_broker \
|
||||
--bin storage_controller \
|
||||
--bin proxy \
|
||||
--bin object_storage \
|
||||
--bin neon_local \
|
||||
--bin storage_scrubber \
|
||||
--locked --release
|
||||
@@ -121,6 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin
|
||||
|
||||
|
||||
@@ -20,8 +20,10 @@ use compute_api::spec::ComputeMode;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::local_env::{
|
||||
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
|
||||
SafekeeperConf,
|
||||
ObjectStorageConf, SafekeeperConf,
|
||||
};
|
||||
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
|
||||
use control_plane::object_storage::ObjectStorage;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage_controller::{
|
||||
@@ -91,6 +93,8 @@ enum NeonLocalCmd {
|
||||
#[command(subcommand)]
|
||||
Safekeeper(SafekeeperCmd),
|
||||
#[command(subcommand)]
|
||||
ObjectStorage(ObjectStorageCmd),
|
||||
#[command(subcommand)]
|
||||
Endpoint(EndpointCmd),
|
||||
#[command(subcommand)]
|
||||
Mappings(MappingsCmd),
|
||||
@@ -454,6 +458,32 @@ enum SafekeeperCmd {
|
||||
Restart(SafekeeperRestartCmdArgs),
|
||||
}
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
#[clap(about = "Manage object storage")]
|
||||
enum ObjectStorageCmd {
|
||||
Start(ObjectStorageStartCmd),
|
||||
Stop(ObjectStorageStopCmd),
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
#[clap(about = "Start object storage")]
|
||||
struct ObjectStorageStartCmd {
|
||||
#[clap(short = 't', long, help = "timeout until we fail the command")]
|
||||
#[arg(default_value = "10s")]
|
||||
start_timeout: humantime::Duration,
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
#[clap(about = "Stop object storage")]
|
||||
struct ObjectStorageStopCmd {
|
||||
#[arg(value_enum, default_value = "fast")]
|
||||
#[clap(
|
||||
short = 'm',
|
||||
help = "If 'immediate', don't flush repository data at shutdown"
|
||||
)]
|
||||
stop_mode: StopMode,
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
#[clap(about = "Start local safekeeper")]
|
||||
struct SafekeeperStartCmdArgs {
|
||||
@@ -759,6 +789,7 @@ fn main() -> Result<()> {
|
||||
}
|
||||
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
|
||||
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
|
||||
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
|
||||
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
|
||||
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
|
||||
};
|
||||
@@ -975,6 +1006,9 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
object_storage: ObjectStorageConf {
|
||||
port: OBJECT_STORAGE_DEFAULT_PORT,
|
||||
},
|
||||
pg_distrib_dir: None,
|
||||
neon_distrib_dir: None,
|
||||
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
|
||||
@@ -1683,6 +1717,41 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
|
||||
use ObjectStorageCmd::*;
|
||||
let storage = ObjectStorage::from_env(env);
|
||||
|
||||
// In tests like test_forward_compatibility or test_graceful_cluster_restart
|
||||
// old neon binaries (without object_storage) are present
|
||||
if !storage.bin.exists() {
|
||||
eprintln!(
|
||||
"{} binary not found. Ignore if this is a compatibility test",
|
||||
storage.bin
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match subcmd {
|
||||
Start(ObjectStorageStartCmd { start_timeout }) => {
|
||||
if let Err(e) = storage.start(start_timeout).await {
|
||||
eprintln!("object_storage start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
Stop(ObjectStorageStopCmd { stop_mode }) => {
|
||||
let immediate = match stop_mode {
|
||||
StopMode::Fast => false,
|
||||
StopMode::Immediate => true,
|
||||
};
|
||||
if let Err(e) = storage.stop(immediate) {
|
||||
eprintln!("proxy stop failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
|
||||
match subcmd {
|
||||
StorageBrokerCmd::Start(args) => {
|
||||
@@ -1777,6 +1846,13 @@ async fn handle_start_all_impl(
|
||||
.map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
|
||||
});
|
||||
}
|
||||
|
||||
js.spawn(async move {
|
||||
ObjectStorage::from_env(env)
|
||||
.start(&retry_timeout)
|
||||
.await
|
||||
.map_err(|e| e.context("start object_storage"))
|
||||
});
|
||||
})();
|
||||
|
||||
let mut errors = Vec::new();
|
||||
@@ -1874,6 +1950,11 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
}
|
||||
}
|
||||
|
||||
let storage = ObjectStorage::from_env(env);
|
||||
if let Err(e) = storage.stop(immediate) {
|
||||
eprintln!("object_storage stop failed: {:#}", e);
|
||||
}
|
||||
|
||||
for ps_conf in &env.pageservers {
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
if let Err(e) = pageserver.stop(immediate) {
|
||||
|
||||
@@ -10,6 +10,7 @@ mod background_process;
|
||||
pub mod broker;
|
||||
pub mod endpoint;
|
||||
pub mod local_env;
|
||||
pub mod object_storage;
|
||||
pub mod pageserver;
|
||||
pub mod postgresql_conf;
|
||||
pub mod safekeeper;
|
||||
|
||||
@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
|
||||
use utils::auth::{Claims, encode_from_key_file};
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
|
||||
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
@@ -55,6 +56,7 @@ pub struct LocalEnv {
|
||||
|
||||
// used to issue tokens during e.g pg start
|
||||
pub private_key_path: PathBuf,
|
||||
pub public_key_path: PathBuf,
|
||||
|
||||
pub broker: NeonBroker,
|
||||
|
||||
@@ -68,6 +70,8 @@ pub struct LocalEnv {
|
||||
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
|
||||
pub object_storage: ObjectStorageConf,
|
||||
|
||||
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
|
||||
// be propagated into each pageserver's configuration.
|
||||
pub control_plane_api: Url,
|
||||
@@ -95,6 +99,7 @@ pub struct OnDiskConfig {
|
||||
pub neon_distrib_dir: PathBuf,
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
pub private_key_path: PathBuf,
|
||||
pub public_key_path: PathBuf,
|
||||
pub broker: NeonBroker,
|
||||
pub storage_controller: NeonStorageControllerConf,
|
||||
#[serde(
|
||||
@@ -103,6 +108,7 @@ pub struct OnDiskConfig {
|
||||
)]
|
||||
pub pageservers: Vec<PageServerConf>,
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub object_storage: ObjectStorageConf,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_hooks_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
@@ -136,11 +142,18 @@ pub struct NeonLocalInitConf {
|
||||
pub storage_controller: Option<NeonStorageControllerConf>,
|
||||
pub pageservers: Vec<NeonLocalInitPageserverConf>,
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub object_storage: ObjectStorageConf,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_hooks_api: Option<Url>,
|
||||
pub generate_local_ssl_certs: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct ObjectStorageConf {
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
/// Broker config for cluster internal communication.
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
@@ -398,6 +411,10 @@ impl LocalEnv {
|
||||
self.pg_dir(pg_version, "lib")
|
||||
}
|
||||
|
||||
pub fn object_storage_bin(&self) -> PathBuf {
|
||||
self.neon_distrib_dir.join("object_storage")
|
||||
}
|
||||
|
||||
pub fn pageserver_bin(&self) -> PathBuf {
|
||||
self.neon_distrib_dir.join("pageserver")
|
||||
}
|
||||
@@ -431,6 +448,10 @@ impl LocalEnv {
|
||||
self.base_data_dir.join("safekeepers").join(data_dir_name)
|
||||
}
|
||||
|
||||
pub fn object_storage_data_dir(&self) -> PathBuf {
|
||||
self.base_data_dir.join("object_storage")
|
||||
}
|
||||
|
||||
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
|
||||
if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
|
||||
Ok(conf)
|
||||
@@ -582,6 +603,7 @@ impl LocalEnv {
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
private_key_path,
|
||||
public_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
@@ -591,6 +613,7 @@ impl LocalEnv {
|
||||
control_plane_compute_hook_api: _,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
object_storage,
|
||||
} = on_disk_config;
|
||||
LocalEnv {
|
||||
base_data_dir: repopath.to_owned(),
|
||||
@@ -598,6 +621,7 @@ impl LocalEnv {
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
private_key_path,
|
||||
public_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
@@ -606,6 +630,7 @@ impl LocalEnv {
|
||||
control_plane_hooks_api,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
object_storage,
|
||||
}
|
||||
};
|
||||
|
||||
@@ -705,6 +730,7 @@ impl LocalEnv {
|
||||
neon_distrib_dir: self.neon_distrib_dir.clone(),
|
||||
default_tenant_id: self.default_tenant_id,
|
||||
private_key_path: self.private_key_path.clone(),
|
||||
public_key_path: self.public_key_path.clone(),
|
||||
broker: self.broker.clone(),
|
||||
storage_controller: self.storage_controller.clone(),
|
||||
pageservers: vec![], // it's skip_serializing anyway
|
||||
@@ -714,6 +740,7 @@ impl LocalEnv {
|
||||
control_plane_compute_hook_api: None,
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
generate_local_ssl_certs: self.generate_local_ssl_certs,
|
||||
object_storage: self.object_storage.clone(),
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -797,6 +824,7 @@ impl LocalEnv {
|
||||
control_plane_api,
|
||||
generate_local_ssl_certs,
|
||||
control_plane_hooks_api,
|
||||
object_storage,
|
||||
} = conf;
|
||||
|
||||
// Find postgres binaries.
|
||||
@@ -828,6 +856,7 @@ impl LocalEnv {
|
||||
)
|
||||
.context("generate auth keys")?;
|
||||
let private_key_path = PathBuf::from("auth_private_key.pem");
|
||||
let public_key_path = PathBuf::from("auth_public_key.pem");
|
||||
|
||||
// create the runtime type because the remaining initialization code below needs
|
||||
// a LocalEnv instance op operation
|
||||
@@ -838,6 +867,7 @@ impl LocalEnv {
|
||||
neon_distrib_dir,
|
||||
default_tenant_id: Some(default_tenant_id),
|
||||
private_key_path,
|
||||
public_key_path,
|
||||
broker,
|
||||
storage_controller: storage_controller.unwrap_or_default(),
|
||||
pageservers: pageservers.iter().map(Into::into).collect(),
|
||||
@@ -846,6 +876,7 @@ impl LocalEnv {
|
||||
control_plane_hooks_api,
|
||||
branch_name_mappings: Default::default(),
|
||||
generate_local_ssl_certs,
|
||||
object_storage,
|
||||
};
|
||||
|
||||
if generate_local_ssl_certs {
|
||||
@@ -873,8 +904,13 @@ impl LocalEnv {
|
||||
.context("pageserver init failed")?;
|
||||
}
|
||||
|
||||
ObjectStorage::from_env(&env)
|
||||
.init()
|
||||
.context("object storage init failed")?;
|
||||
|
||||
// setup remote remote location for default LocalFs remote storage
|
||||
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
|
||||
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
|
||||
|
||||
env.persist_config()
|
||||
}
|
||||
|
||||
107
control_plane/src/object_storage.rs
Normal file
107
control_plane/src/object_storage.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
use crate::background_process::{self, start_process, stop_process};
|
||||
use crate::local_env::LocalEnv;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::{Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use std::io::Write;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Directory within .neon which will be used by default for LocalFs remote storage.
|
||||
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
|
||||
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
|
||||
|
||||
pub struct ObjectStorage {
|
||||
pub bin: Utf8PathBuf,
|
||||
pub data_dir: Utf8PathBuf,
|
||||
pub pemfile: Utf8PathBuf,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl ObjectStorage {
|
||||
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
|
||||
ObjectStorage {
|
||||
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
|
||||
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
|
||||
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
|
||||
port: env.object_storage.port,
|
||||
}
|
||||
}
|
||||
|
||||
fn config_path(&self) -> Utf8PathBuf {
|
||||
self.data_dir.join("object_storage.json")
|
||||
}
|
||||
|
||||
fn listen_addr(&self) -> Utf8PathBuf {
|
||||
format!("127.0.0.1:{}", self.port).into()
|
||||
}
|
||||
|
||||
pub fn init(&self) -> Result<()> {
|
||||
println!("Initializing object storage in {:?}", self.data_dir);
|
||||
let parent = self.data_dir.parent().unwrap();
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Cfg {
|
||||
listen: Utf8PathBuf,
|
||||
pemfile: Utf8PathBuf,
|
||||
local_path: Utf8PathBuf,
|
||||
r#type: String,
|
||||
}
|
||||
let cfg = Cfg {
|
||||
listen: self.listen_addr(),
|
||||
pemfile: parent.join(self.pemfile.clone()),
|
||||
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
|
||||
r#type: "LocalFs".to_string(),
|
||||
};
|
||||
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
|
||||
std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
|
||||
.context("write object storage config")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
|
||||
println!("Starting s3 proxy at {}", self.listen_addr());
|
||||
std::io::stdout().flush().context("flush stdout")?;
|
||||
|
||||
let process_status_check = || async {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
let res = reqwest::Client::new()
|
||||
.get(format!("http://{}/metrics", self.listen_addr()))
|
||||
.send()
|
||||
.await;
|
||||
match res {
|
||||
Ok(response) if response.status().is_success() => Ok(true),
|
||||
Ok(_) => Err(anyhow!("Failed to query /metrics")),
|
||||
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
|
||||
}
|
||||
};
|
||||
|
||||
let res = start_process(
|
||||
"object_storage",
|
||||
&self.data_dir.clone().into_std_path_buf(),
|
||||
&self.bin.clone().into_std_path_buf(),
|
||||
vec![self.config_path().to_string()],
|
||||
vec![("RUST_LOG".into(), "debug".into())],
|
||||
background_process::InitialPidFile::Create(self.pid_file()),
|
||||
retry_timeout,
|
||||
process_status_check,
|
||||
)
|
||||
.await;
|
||||
if res.is_err() {
|
||||
eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
stop_process(immediate, "object_storage", &self.pid_file())
|
||||
}
|
||||
|
||||
fn log_file(&self) -> Utf8PathBuf {
|
||||
self.data_dir.join("object_storage.log")
|
||||
}
|
||||
|
||||
fn pid_file(&self) -> Utf8PathBuf {
|
||||
self.data_dir.join("object_storage.pid")
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,7 @@ toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
scopeguard.workspace = true
|
||||
metrics.workspace = true
|
||||
utils.workspace = true
|
||||
utils = { path = "../utils", default-features = false }
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
azure_core.workspace = true
|
||||
|
||||
@@ -5,7 +5,8 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
default = ["rename_noreplace"]
|
||||
rename_noreplace = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
@@ -35,7 +36,7 @@ serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
tokio-tar.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = ["serde"] }
|
||||
|
||||
@@ -3,7 +3,9 @@ use std::{fs, io, path::Path};
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
#[cfg(feature = "rename_noreplace")]
|
||||
mod rename_noreplace;
|
||||
#[cfg(feature = "rename_noreplace")]
|
||||
pub use rename_noreplace::rename_noreplace;
|
||||
|
||||
pub trait PathExt {
|
||||
|
||||
@@ -8,7 +8,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
|
||||
dst: &P2,
|
||||
) -> nix::Result<()> {
|
||||
{
|
||||
#[cfg(target_os = "linux")]
|
||||
#[cfg(all(target_os = "linux", target_env = "gnu"))]
|
||||
{
|
||||
nix::fcntl::renameat2(
|
||||
None,
|
||||
@@ -29,7 +29,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
|
||||
})??;
|
||||
nix::errno::Errno::result(res).map(drop)
|
||||
}
|
||||
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||
#[cfg(not(any(all(target_os = "linux", target_env = "gnu"), target_os = "macos")))]
|
||||
{
|
||||
std::compile_error!("OS does not support no-replace renames");
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
pub use signal_hook::consts::TERM_SIGNALS;
|
||||
pub use signal_hook::consts::signal::*;
|
||||
use signal_hook::iterator::Signals;
|
||||
use tokio::signal::unix::{SignalKind, signal};
|
||||
use tracing::info;
|
||||
|
||||
pub enum Signal {
|
||||
Quit,
|
||||
@@ -36,3 +38,30 @@ impl ShutdownSignals {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs in a loop since we want to be responsive to multiple signals
|
||||
/// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown)
|
||||
/// <https://github.com/neondatabase/neon/issues/9740>
|
||||
pub async fn signal_handler(token: tokio_util::sync::CancellationToken) {
|
||||
let mut sigint = signal(SignalKind::interrupt()).unwrap();
|
||||
let mut sigterm = signal(SignalKind::terminate()).unwrap();
|
||||
let mut sigquit = signal(SignalKind::quit()).unwrap();
|
||||
|
||||
loop {
|
||||
let signal = tokio::select! {
|
||||
_ = sigquit.recv() => {
|
||||
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
|
||||
std::process::exit(111);
|
||||
}
|
||||
_ = sigint.recv() => "SIGINT",
|
||||
_ = sigterm.recv() => "SIGTERM",
|
||||
};
|
||||
|
||||
if !token.is_cancelled() {
|
||||
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
|
||||
token.cancel();
|
||||
} else {
|
||||
info!("Got signal {signal}. Already shutting down.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
28
object_storage/Cargo.toml
Normal file
28
object_storage/Cargo.toml
Normal file
@@ -0,0 +1,28 @@
|
||||
[package]
|
||||
name = "object_storage"
|
||||
version = "0.0.1"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
axum-extra.workspace = true
|
||||
axum.workspace = true
|
||||
camino.workspace = true
|
||||
futures.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
prometheus.workspace = true
|
||||
remote_storage.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
utils = { path = "../libs/utils", default-features = false }
|
||||
workspace_hack.workspace = true
|
||||
[dev-dependencies]
|
||||
camino-tempfile.workspace = true
|
||||
http-body-util.workspace = true
|
||||
itertools.workspace = true
|
||||
rand.workspace = true
|
||||
test-log.workspace = true
|
||||
tower.workspace = true
|
||||
561
object_storage/src/app.rs
Normal file
561
object_storage/src/app.rs
Normal file
@@ -0,0 +1,561 @@
|
||||
use anyhow::anyhow;
|
||||
use axum::body::{Body, Bytes};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Router, http::StatusCode};
|
||||
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
|
||||
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
use utils::backoff::retry;
|
||||
|
||||
pub fn app(state: Arc<Storage>) -> Router<()> {
|
||||
use axum::routing::{delete as _delete, get as _get};
|
||||
let delete_prefix = _delete(delete_prefix);
|
||||
Router::new()
|
||||
.route(
|
||||
"/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
|
||||
_get(get).put(set).delete(delete),
|
||||
)
|
||||
.route(
|
||||
"/{tenant_id}/{timeline_id}/{endpoint_id}",
|
||||
delete_prefix.clone(),
|
||||
)
|
||||
.route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
|
||||
.route("/{tenant_id}", delete_prefix)
|
||||
.route("/metrics", _get(metrics))
|
||||
.route("/status", _get(async || StatusCode::OK.into_response()))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
type Result = anyhow::Result<Response, Response>;
|
||||
type State = axum::extract::State<Arc<Storage>>;
|
||||
|
||||
const CONTENT_TYPE: &str = "content-type";
|
||||
const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
|
||||
const WARN_THRESHOLD: u32 = 3;
|
||||
const MAX_RETRIES: u32 = 10;
|
||||
|
||||
async fn metrics() -> Result {
|
||||
prometheus::TextEncoder::new()
|
||||
.encode_to_string(&prometheus::gather())
|
||||
.map(|s| s.into_response())
|
||||
.map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
|
||||
}
|
||||
|
||||
async fn get(S3Path { path }: S3Path, state: State) -> Result {
|
||||
info!(%path, "downloading");
|
||||
let download_err = |e| {
|
||||
if let DownloadError::NotFound = e {
|
||||
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
|
||||
return not_found(&path);
|
||||
}
|
||||
internal_error(e, &path, "downloading")
|
||||
};
|
||||
let cancel = state.cancel.clone();
|
||||
let opts = &DownloadOpts::default();
|
||||
|
||||
let stream = retry(
|
||||
async || state.storage.download(&path, opts, &cancel).await,
|
||||
DownloadError::is_permanent,
|
||||
WARN_THRESHOLD,
|
||||
MAX_RETRIES,
|
||||
"downloading",
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(DownloadError::Cancelled))
|
||||
.map_err(download_err)?
|
||||
.download_stream;
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
|
||||
.body(Body::from_stream(stream))
|
||||
.map_err(|e| internal_error(e, path, "reading response"))
|
||||
}
|
||||
|
||||
// Best solution for files is multipart upload, but remote_storage doesn't support it,
|
||||
// so we can either read Bytes in memory and push at once or forward BodyDataStream to
|
||||
// remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
|
||||
// guaranteed size() which may produce issues while uploading to s3.
|
||||
// So, currently we're going with an in-memory copy plus a boundary to prevent uploading
|
||||
// very large files.
|
||||
async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
|
||||
info!(%path, "uploading");
|
||||
let request_len = bytes.len();
|
||||
let max_len = state.max_upload_file_limit;
|
||||
if request_len > max_len {
|
||||
return Err(bad_request(
|
||||
anyhow!("File size {request_len} exceeds max {max_len}"),
|
||||
"uploading",
|
||||
));
|
||||
}
|
||||
|
||||
let cancel = state.cancel.clone();
|
||||
let fun = async || {
|
||||
let stream = bytes_to_stream(bytes.clone());
|
||||
state
|
||||
.storage
|
||||
.upload(stream, request_len, &path, None, &cancel)
|
||||
.await
|
||||
};
|
||||
retry(
|
||||
fun,
|
||||
TimeoutOrCancel::caused_by_cancel,
|
||||
WARN_THRESHOLD,
|
||||
MAX_RETRIES,
|
||||
"uploading",
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(anyhow!("uploading cancelled")))
|
||||
.map_err(|e| internal_error(e, path, "reading response"))?;
|
||||
Ok(ok())
|
||||
}
|
||||
|
||||
async fn delete(S3Path { path }: S3Path, state: State) -> Result {
|
||||
info!(%path, "deleting");
|
||||
let cancel = state.cancel.clone();
|
||||
retry(
|
||||
async || state.storage.delete(&path, &cancel).await,
|
||||
TimeoutOrCancel::caused_by_cancel,
|
||||
WARN_THRESHOLD,
|
||||
MAX_RETRIES,
|
||||
"deleting",
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(anyhow!("deleting cancelled")))
|
||||
.map_err(|e| internal_error(e, path, "deleting"))?;
|
||||
Ok(ok())
|
||||
}
|
||||
|
||||
async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
|
||||
info!(%path, "deleting prefix");
|
||||
let cancel = state.cancel.clone();
|
||||
retry(
|
||||
async || state.storage.delete_prefix(&path, &cancel).await,
|
||||
TimeoutOrCancel::caused_by_cancel,
|
||||
WARN_THRESHOLD,
|
||||
MAX_RETRIES,
|
||||
"deleting prefix",
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(anyhow!("deleting prefix cancelled")))
|
||||
.map_err(|e| internal_error(e, path, "deleting prefix"))?;
|
||||
Ok(ok())
|
||||
}
|
||||
|
||||
pub async fn check_storage_permissions(
|
||||
client: &GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("storage permissions check");
|
||||
|
||||
// as_nanos() as multiple instances proxying same bucket may be started at once
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)?
|
||||
.as_nanos()
|
||||
.to_string();
|
||||
|
||||
let path = RemotePath::from_string(&format!("write_access_{now}"))?;
|
||||
info!(%path, "uploading");
|
||||
|
||||
let body = now.to_string();
|
||||
let stream = bytes_to_stream(Bytes::from(body.clone()));
|
||||
client
|
||||
.upload(stream, body.len(), &path, None, &cancel)
|
||||
.await?;
|
||||
|
||||
use tokio::io::AsyncReadExt;
|
||||
info!(%path, "downloading");
|
||||
let download_opts = DownloadOpts {
|
||||
kind: remote_storage::DownloadKind::Small,
|
||||
..Default::default()
|
||||
};
|
||||
let mut body_read_buf = Vec::new();
|
||||
let stream = client
|
||||
.download(&path, &download_opts, &cancel)
|
||||
.await?
|
||||
.download_stream;
|
||||
tokio_util::io::StreamReader::new(stream)
|
||||
.read_to_end(&mut body_read_buf)
|
||||
.await?;
|
||||
let body_read = String::from_utf8(body_read_buf)?;
|
||||
if body != body_read {
|
||||
error!(%body, %body_read, "File contents do not match");
|
||||
anyhow::bail!("Read back file doesn't match original")
|
||||
}
|
||||
|
||||
info!(%path, "removing");
|
||||
client.delete(&path, &cancel).await
|
||||
}
|
||||
|
||||
fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
|
||||
futures::stream::once(futures::future::ready(Ok(bytes)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use axum::{body::Body, extract::Request, response::Response};
|
||||
use http_body_util::BodyExt;
|
||||
use itertools::iproduct;
|
||||
use std::env::var;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use test_log::test as testlog;
|
||||
use tower::{Service, util::ServiceExt};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
// see libs/remote_storage/tests/test_real_s3.rs
|
||||
const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
|
||||
const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
|
||||
const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
|
||||
|
||||
async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
|
||||
let cancel = CancellationToken::new();
|
||||
let (dir, storage) = if var(REAL_S3_ENV).is_err() {
|
||||
// tests execute in parallel and we need a new directory for each of them
|
||||
let dir = camino_tempfile::tempdir().unwrap();
|
||||
let fs =
|
||||
remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
|
||||
(Some(dir), GenericRemoteStorage::LocalFs(fs))
|
||||
} else {
|
||||
// test_real_s3::create_s3_client is hard to reference, reimplementing here
|
||||
let millis = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
use rand::Rng;
|
||||
let random = rand::thread_rng().r#gen::<u32>();
|
||||
|
||||
let s3_config = remote_storage::S3Config {
|
||||
bucket_name: var(REAL_S3_BUCKET).unwrap(),
|
||||
bucket_region: var(REAL_S3_REGION).unwrap(),
|
||||
prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
|
||||
endpoint: None,
|
||||
concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
|
||||
max_keys_per_list_response: None,
|
||||
upload_storage_class: None,
|
||||
};
|
||||
let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
|
||||
.await
|
||||
.unwrap();
|
||||
(None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
|
||||
};
|
||||
|
||||
let proxy = Storage {
|
||||
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
|
||||
storage,
|
||||
cancel: cancel.clone(),
|
||||
max_upload_file_limit: usize::MAX,
|
||||
};
|
||||
check_storage_permissions(&proxy.storage, cancel)
|
||||
.await
|
||||
.unwrap();
|
||||
(proxy, dir)
|
||||
}
|
||||
|
||||
// see libs/utils/src/auth.rs
|
||||
const TEST_PUB_KEY_ED25519: &[u8] = b"
|
||||
-----BEGIN PUBLIC KEY-----
|
||||
MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
|
||||
-----END PUBLIC KEY-----
|
||||
";
|
||||
|
||||
const TEST_PRIV_KEY_ED25519: &[u8] = br#"
|
||||
-----BEGIN PRIVATE KEY-----
|
||||
MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
-----END PRIVATE KEY-----
|
||||
"#;
|
||||
|
||||
async fn request(req: Request<Body>) -> Response<Body> {
|
||||
let (proxy, _) = proxy().await;
|
||||
app(Arc::new(proxy))
|
||||
.into_service()
|
||||
.oneshot(req)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn status() {
|
||||
let res = Request::builder()
|
||||
.uri("/status")
|
||||
.body(Body::empty())
|
||||
.map(request)
|
||||
.unwrap()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
|
||||
iproduct!(
|
||||
vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
|
||||
vec!["GET", "PUT", "DELETE"]
|
||||
)
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn no_token() {
|
||||
for (uri, method) in routes() {
|
||||
info!(%uri, %method);
|
||||
let res = Request::builder()
|
||||
.uri(uri)
|
||||
.method(method)
|
||||
.body(Body::empty())
|
||||
.map(request)
|
||||
.unwrap()
|
||||
.await;
|
||||
assert!(matches!(
|
||||
res.status(),
|
||||
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn invalid_token() {
|
||||
for (uri, method) in routes() {
|
||||
info!(%uri, %method);
|
||||
let status = Request::builder()
|
||||
.uri(uri)
|
||||
.header("Authorization", "Bearer 123")
|
||||
.method(method)
|
||||
.body(Body::empty())
|
||||
.map(request)
|
||||
.unwrap()
|
||||
.await;
|
||||
assert!(matches!(
|
||||
status.status(),
|
||||
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
const TENANT_ID: TenantId =
|
||||
TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
|
||||
const TIMELINE_ID: TimelineId =
|
||||
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
|
||||
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
|
||||
fn token() -> String {
|
||||
let claims = object_storage::Claims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: TIMELINE_ID,
|
||||
endpoint_id: ENDPOINT_ID.into(),
|
||||
exp: u64::MAX,
|
||||
};
|
||||
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
|
||||
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
|
||||
jsonwebtoken::encode(&header, &claims, &key).unwrap()
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn unauthorized() {
|
||||
let (proxy, _) = proxy().await;
|
||||
let mut app = app(Arc::new(proxy)).into_service();
|
||||
let token = token();
|
||||
let args = itertools::iproduct!(
|
||||
vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
|
||||
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
|
||||
vec![ENDPOINT_ID, "ep-ololo"]
|
||||
)
|
||||
.skip(1);
|
||||
|
||||
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
|
||||
info!(%uri, %method, %tenant, %timeline, %endpoint);
|
||||
let request = Request::builder()
|
||||
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
|
||||
.method(method)
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
let status = ServiceExt::ready(&mut app)
|
||||
.await
|
||||
.unwrap()
|
||||
.call(request)
|
||||
.await
|
||||
.unwrap()
|
||||
.status();
|
||||
assert_eq!(status, StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn method_not_allowed() {
|
||||
let token = token();
|
||||
let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
|
||||
for (key, method) in iter {
|
||||
let status = Request::builder()
|
||||
.uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
|
||||
.method(method)
|
||||
.header("Authorization", format!("Bearer {token}"))
|
||||
.body(Body::empty())
|
||||
.map(request)
|
||||
.unwrap()
|
||||
.await
|
||||
.status();
|
||||
assert!(matches!(
|
||||
status,
|
||||
StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
async fn requests_chain(
|
||||
chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
|
||||
token: impl Fn(&str) -> String,
|
||||
) {
|
||||
let (proxy, _) = proxy().await;
|
||||
let mut app = app(Arc::new(proxy)).into_service();
|
||||
for (uri, method, body, expected_status, compare_body) in chain {
|
||||
info!(%uri, %method, %body, %expected_status);
|
||||
let bearer = format!("Bearer {}", token(&uri));
|
||||
let request = Request::builder()
|
||||
.uri(uri)
|
||||
.method(method)
|
||||
.header("Authorization", &bearer)
|
||||
.body(Body::from(body))
|
||||
.unwrap();
|
||||
let response = ServiceExt::ready(&mut app)
|
||||
.await
|
||||
.unwrap()
|
||||
.call(request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), expected_status);
|
||||
if !compare_body {
|
||||
continue;
|
||||
}
|
||||
let read_body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
assert_eq!(body, read_body);
|
||||
}
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn metrics() {
|
||||
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
|
||||
let req = vec![
|
||||
(uri.clone(), "PUT", "body", StatusCode::OK, false),
|
||||
(uri.clone(), "DELETE", "", StatusCode::OK, false),
|
||||
];
|
||||
requests_chain(req.into_iter(), |_| token()).await;
|
||||
|
||||
let res = Request::builder()
|
||||
.uri("/metrics")
|
||||
.body(Body::empty())
|
||||
.map(request)
|
||||
.unwrap()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = res.into_body().collect().await.unwrap().to_bytes();
|
||||
let body = String::from_utf8_lossy(&body);
|
||||
tracing::debug!(%body);
|
||||
// Storage metrics are not gathered for LocalFs
|
||||
if var(REAL_S3_ENV).is_ok() {
|
||||
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
|
||||
}
|
||||
assert!(body.contains("process_threads"));
|
||||
}
|
||||
|
||||
#[testlog(tokio::test)]
|
||||
async fn insert_retrieve_remove() {
|
||||
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
|
||||
let chain = vec![
|
||||
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
|
||||
(uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
|
||||
(uri.clone(), "DELETE", "", StatusCode::OK, false),
|
||||
(uri, "GET", "", StatusCode::NOT_FOUND, false),
|
||||
];
|
||||
requests_chain(chain.into_iter(), |_| token()).await;
|
||||
}
|
||||
|
||||
fn delete_prefix_token(uri: &str) -> String {
|
||||
use serde::Serialize;
|
||||
let parts = uri.split("/").collect::<Vec<&str>>();
|
||||
#[derive(Serialize)]
|
||||
struct PrefixClaims {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
endpoint_id: Option<object_storage::EndpointId>,
|
||||
exp: u64,
|
||||
}
|
||||
let claims = PrefixClaims {
|
||||
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
|
||||
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
|
||||
endpoint_id: parts.get(3).map(ToString::to_string),
|
||||
exp: u64::MAX,
|
||||
};
|
||||
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
|
||||
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
|
||||
jsonwebtoken::encode(&header, &claims, &key).unwrap()
|
||||
}
|
||||
|
||||
// Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
|
||||
#[testlog(tokio::test)]
|
||||
async fn delete_prefix() {
|
||||
let tenant_id =
|
||||
TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
|
||||
let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
|
||||
let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
|
||||
let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
|
||||
let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
|
||||
// Why extra slash in string literals? Axum is weird with URIs:
|
||||
// /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
|
||||
// as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
|
||||
// The cost of removing trailing slash is suprisingly hard:
|
||||
// * Add tower dependency with NormalizePath layer
|
||||
// * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
|
||||
// * Rewrite make_service() -> into_make_service()
|
||||
// * Rewrite oneshot() (not available for NormalizePath)
|
||||
// I didn't manage to get it working correctly
|
||||
let chain = vec![
|
||||
// create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
|
||||
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
|
||||
(f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
|
||||
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
// create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
|
||||
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
|
||||
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
|
||||
// create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
|
||||
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
|
||||
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
// create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
|
||||
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
|
||||
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
|
||||
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
|
||||
// create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
|
||||
(f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
|
||||
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
|
||||
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
|
||||
(f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
|
||||
// delete prefix 1 -> empty
|
||||
(format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
|
||||
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
(f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
|
||||
];
|
||||
requests_chain(chain.into_iter(), delete_prefix_token).await;
|
||||
}
|
||||
}
|
||||
344
object_storage/src/lib.rs
Normal file
344
object_storage/src/lib.rs
Normal file
@@ -0,0 +1,344 @@
|
||||
use anyhow::Result;
|
||||
use axum::extract::{FromRequestParts, Path};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{RequestPartsExt, http::StatusCode, http::request::Parts};
|
||||
use axum_extra::TypedHeader;
|
||||
use axum_extra::headers::{Authorization, authorization::Bearer};
|
||||
use camino::Utf8PathBuf;
|
||||
use jsonwebtoken::{DecodingKey, Validation};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
// simplified version of utils::auth::JwtAuth
|
||||
pub struct JwtAuth {
|
||||
decoding_key: DecodingKey,
|
||||
validation: Validation,
|
||||
}
|
||||
|
||||
pub const VALIDATION_ALGO: jsonwebtoken::Algorithm = jsonwebtoken::Algorithm::EdDSA;
|
||||
impl JwtAuth {
|
||||
pub fn new(key: &[u8]) -> Result<Self> {
|
||||
Ok(Self {
|
||||
decoding_key: DecodingKey::from_ed_pem(key)?,
|
||||
validation: Validation::new(VALIDATION_ALGO),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn decode<T: serde::de::DeserializeOwned>(&self, token: &str) -> Result<T> {
|
||||
Ok(jsonwebtoken::decode(token, &self.decoding_key, &self.validation).map(|t| t.claims)?)
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_key(key: &str) -> StdResult<Utf8PathBuf, String> {
|
||||
let key = clean_utf8(&Utf8PathBuf::from(key));
|
||||
if key.starts_with("..") || key == "." || key == "/" {
|
||||
return Err(format!("invalid key {key}"));
|
||||
}
|
||||
match key.strip_prefix("/").map(Utf8PathBuf::from) {
|
||||
Ok(p) => Ok(p),
|
||||
_ => Ok(key),
|
||||
}
|
||||
}
|
||||
|
||||
// Copied from path_clean crate with PathBuf->Utf8PathBuf
|
||||
fn clean_utf8(path: &camino::Utf8Path) -> Utf8PathBuf {
|
||||
use camino::Utf8Component as Comp;
|
||||
let mut out = Vec::new();
|
||||
for comp in path.components() {
|
||||
match comp {
|
||||
Comp::CurDir => (),
|
||||
Comp::ParentDir => match out.last() {
|
||||
Some(Comp::RootDir) => (),
|
||||
Some(Comp::Normal(_)) => {
|
||||
out.pop();
|
||||
}
|
||||
None | Some(Comp::CurDir) | Some(Comp::ParentDir) | Some(Comp::Prefix(_)) => {
|
||||
out.push(comp)
|
||||
}
|
||||
},
|
||||
comp => out.push(comp),
|
||||
}
|
||||
}
|
||||
if !out.is_empty() {
|
||||
out.iter().collect()
|
||||
} else {
|
||||
Utf8PathBuf::from(".")
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Storage {
|
||||
pub auth: JwtAuth,
|
||||
pub storage: GenericRemoteStorage,
|
||||
pub cancel: CancellationToken,
|
||||
pub max_upload_file_limit: usize,
|
||||
}
|
||||
|
||||
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct Claims {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub endpoint_id: EndpointId,
|
||||
pub exp: u64,
|
||||
}
|
||||
|
||||
impl Display for Claims {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
|
||||
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct KeyRequest {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
endpoint_id: EndpointId,
|
||||
path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct S3Path {
|
||||
pub path: RemotePath,
|
||||
}
|
||||
|
||||
impl TryFrom<&KeyRequest> for S3Path {
|
||||
type Error = String;
|
||||
fn try_from(req: &KeyRequest) -> StdResult<Self, Self::Error> {
|
||||
let KeyRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
endpoint_id,
|
||||
path,
|
||||
} = &req;
|
||||
let prefix = format!("{tenant_id}/{timeline_id}/{endpoint_id}",);
|
||||
let path = Utf8PathBuf::from(prefix).join(normalize_key(path)?);
|
||||
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
|
||||
Ok(S3Path { path })
|
||||
}
|
||||
}
|
||||
|
||||
fn unauthorized(route: impl Display, claims: impl Display) -> Response {
|
||||
debug!(%route, %claims, "route doesn't match claims");
|
||||
StatusCode::UNAUTHORIZED.into_response()
|
||||
}
|
||||
|
||||
pub fn bad_request(err: impl Display, desc: &'static str) -> Response {
|
||||
debug!(%err, desc);
|
||||
(StatusCode::BAD_REQUEST, err.to_string()).into_response()
|
||||
}
|
||||
|
||||
pub fn ok() -> Response {
|
||||
StatusCode::OK.into_response()
|
||||
}
|
||||
|
||||
pub fn internal_error(err: impl Display, path: impl Display, desc: &'static str) -> Response {
|
||||
error!(%err, %path, desc);
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
|
||||
pub fn not_found(key: impl ToString) -> Response {
|
||||
(StatusCode::NOT_FOUND, key.to_string()).into_response()
|
||||
}
|
||||
|
||||
impl FromRequestParts<Arc<Storage>> for S3Path {
|
||||
type Rejection = Response;
|
||||
async fn from_request_parts(
|
||||
parts: &mut Parts,
|
||||
state: &Arc<Storage>,
|
||||
) -> Result<Self, Self::Rejection> {
|
||||
let Path(path): Path<KeyRequest> = parts
|
||||
.extract()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid route"))?;
|
||||
let TypedHeader(Authorization(bearer)) = parts
|
||||
.extract::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
let claims: Claims = state
|
||||
.auth
|
||||
.decode(bearer.token())
|
||||
.map_err(|e| bad_request(e, "decoding token"))?;
|
||||
let route = Claims {
|
||||
tenant_id: path.tenant_id,
|
||||
timeline_id: path.timeline_id,
|
||||
endpoint_id: path.endpoint_id.clone(),
|
||||
exp: claims.exp,
|
||||
};
|
||||
if route != claims {
|
||||
return Err(unauthorized(route, claims));
|
||||
}
|
||||
(&path)
|
||||
.try_into()
|
||||
.map_err(|e| bad_request(e, "invalid route"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq)]
|
||||
pub struct PrefixKeyPath {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub endpoint_id: Option<EndpointId>,
|
||||
}
|
||||
|
||||
impl Display for PrefixKeyPath {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
|
||||
self.tenant_id,
|
||||
self.timeline_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string()),
|
||||
self.endpoint_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string())
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct PrefixS3Path {
|
||||
pub path: RemotePath,
|
||||
}
|
||||
|
||||
impl From<&PrefixKeyPath> for PrefixS3Path {
|
||||
fn from(path: &PrefixKeyPath) -> Self {
|
||||
let timeline_id = path
|
||||
.timeline_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string());
|
||||
let endpoint_id = path
|
||||
.endpoint_id
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or("".to_string());
|
||||
let path = Utf8PathBuf::from(path.tenant_id.to_string())
|
||||
.join(timeline_id)
|
||||
.join(endpoint_id);
|
||||
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
|
||||
PrefixS3Path { path }
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
|
||||
type Rejection = Response;
|
||||
async fn from_request_parts(
|
||||
parts: &mut Parts,
|
||||
state: &Arc<Storage>,
|
||||
) -> Result<Self, Self::Rejection> {
|
||||
let Path(path) = parts
|
||||
.extract::<Path<PrefixKeyPath>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid route"))?;
|
||||
let TypedHeader(Authorization(bearer)) = parts
|
||||
.extract::<TypedHeader<Authorization<Bearer>>>()
|
||||
.await
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
let claims: PrefixKeyPath = state
|
||||
.auth
|
||||
.decode(bearer.token())
|
||||
.map_err(|e| bad_request(e, "invalid token"))?;
|
||||
if path != claims {
|
||||
return Err(unauthorized(path, claims));
|
||||
}
|
||||
Ok((&path).into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn normalize_key() {
|
||||
let f = super::normalize_key;
|
||||
assert_eq!(f("hello/world/..").unwrap(), Utf8PathBuf::from("hello"));
|
||||
assert_eq!(
|
||||
f("ololo/1/../../not_ololo").unwrap(),
|
||||
Utf8PathBuf::from("not_ololo")
|
||||
);
|
||||
assert!(f("ololo/1/../../../").is_err());
|
||||
assert!(f(".").is_err());
|
||||
assert!(f("../").is_err());
|
||||
assert!(f("").is_err());
|
||||
assert_eq!(f("/1/2/3").unwrap(), Utf8PathBuf::from("1/2/3"));
|
||||
assert!(f("/1/2/3/../../../").is_err());
|
||||
assert!(f("/1/2/3/../../../../").is_err());
|
||||
}
|
||||
|
||||
const TENANT_ID: TenantId =
|
||||
TenantId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
|
||||
const TIMELINE_ID: TimelineId =
|
||||
TimelineId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
|
||||
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
|
||||
|
||||
#[test]
|
||||
fn s3_path() {
|
||||
let auth = Claims {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: TIMELINE_ID,
|
||||
endpoint_id: ENDPOINT_ID.into(),
|
||||
exp: u64::MAX,
|
||||
};
|
||||
let s3_path = |key| {
|
||||
let path = &format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/{key}");
|
||||
let path = RemotePath::from_string(path).unwrap();
|
||||
S3Path { path }
|
||||
};
|
||||
|
||||
let path = "cache_key".to_string();
|
||||
let mut key_path = KeyRequest {
|
||||
path,
|
||||
tenant_id: auth.tenant_id,
|
||||
timeline_id: auth.timeline_id,
|
||||
endpoint_id: auth.endpoint_id,
|
||||
};
|
||||
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
|
||||
|
||||
key_path.path = "we/can/have/nested/paths".to_string();
|
||||
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
|
||||
|
||||
key_path.path = "../error/hello/../".to_string();
|
||||
assert!(S3Path::try_from(&key_path).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_s3_path() {
|
||||
let mut path = PrefixKeyPath {
|
||||
tenant_id: TENANT_ID,
|
||||
timeline_id: None,
|
||||
endpoint_id: None,
|
||||
};
|
||||
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
|
||||
assert_eq!(
|
||||
PrefixS3Path::from(&path).path,
|
||||
prefix_path(format!("{TENANT_ID}"))
|
||||
);
|
||||
|
||||
path.timeline_id = Some(TIMELINE_ID);
|
||||
assert_eq!(
|
||||
PrefixS3Path::from(&path).path,
|
||||
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}"))
|
||||
);
|
||||
|
||||
path.endpoint_id = Some(ENDPOINT_ID.into());
|
||||
assert_eq!(
|
||||
PrefixS3Path::from(&path).path,
|
||||
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}"))
|
||||
);
|
||||
}
|
||||
}
|
||||
65
object_storage/src/main.rs
Normal file
65
object_storage/src/main.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
//! `object_storage` is a service which provides API for uploading and downloading
|
||||
//! files. It is used by compute and control plane for accessing LFC prewarm data.
|
||||
//! This service is deployed either as a separate component or as part of compute image
|
||||
//! for large computes.
|
||||
mod app;
|
||||
use anyhow::Context;
|
||||
use tracing::info;
|
||||
use utils::logging;
|
||||
|
||||
//see set()
|
||||
const fn max_upload_file_limit() -> usize {
|
||||
100 * 1024 * 1024
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
struct Config {
|
||||
listen: std::net::SocketAddr,
|
||||
pemfile: camino::Utf8PathBuf,
|
||||
#[serde(flatten)]
|
||||
storage_config: remote_storage::RemoteStorageConfig,
|
||||
#[serde(default = "max_upload_file_limit")]
|
||||
max_upload_file_limit: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
|
||||
logging::Output::Stdout,
|
||||
)?;
|
||||
|
||||
let config: String = std::env::args().skip(1).take(1).collect();
|
||||
if config.is_empty() {
|
||||
anyhow::bail!("Usage: object_storage config.json")
|
||||
}
|
||||
info!("Reading config from {config}");
|
||||
let config = std::fs::read_to_string(config.clone())?;
|
||||
let config: Config = serde_json::from_str(&config).context("parsing config")?;
|
||||
info!("Reading pemfile from {}", config.pemfile.clone());
|
||||
let pemfile = std::fs::read(config.pemfile.clone())?;
|
||||
info!("Loading public key from {}", config.pemfile.clone());
|
||||
let auth = object_storage::JwtAuth::new(&pemfile)?;
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
|
||||
info!("listening on {}", listener.local_addr().unwrap());
|
||||
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
app::check_storage_permissions(&storage, cancel.clone()).await?;
|
||||
|
||||
let proxy = std::sync::Arc::new(object_storage::Storage {
|
||||
auth,
|
||||
storage,
|
||||
cancel: cancel.clone(),
|
||||
max_upload_file_limit: config.max_upload_file_limit,
|
||||
});
|
||||
|
||||
tokio::spawn(utils::signals::signal_handler(cancel.clone()));
|
||||
axum::serve(listener, app::app(proxy))
|
||||
.with_graceful_shutdown(async move { cancel.cancelled().await })
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -31,7 +31,6 @@ use pageserver::{
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -744,32 +743,7 @@ fn start_pageserver(
|
||||
let signal_token = CancellationToken::new();
|
||||
let signal_cancel = signal_token.child_token();
|
||||
|
||||
// Spawn signal handlers. Runs in a loop since we want to be responsive to multiple signals
|
||||
// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown). See:
|
||||
// https://github.com/neondatabase/neon/issues/9740.
|
||||
tokio::spawn(async move {
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
|
||||
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
|
||||
|
||||
loop {
|
||||
let signal = tokio::select! {
|
||||
_ = sigquit.recv() => {
|
||||
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
|
||||
std::process::exit(111);
|
||||
}
|
||||
_ = sigint.recv() => "SIGINT",
|
||||
_ = sigterm.recv() => "SIGTERM",
|
||||
};
|
||||
|
||||
if !signal_token.is_cancelled() {
|
||||
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
|
||||
signal_token.cancel();
|
||||
} else {
|
||||
info!("Got signal {signal}. Already shutting down.");
|
||||
}
|
||||
}
|
||||
});
|
||||
tokio::spawn(utils::signals::signal_handler(signal_token));
|
||||
|
||||
// Wait for cancellation signal and shut down the pageserver.
|
||||
//
|
||||
|
||||
@@ -417,6 +417,19 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
cmd.append(f"--instance-id={instance_id}")
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def object_storage_start(self, timeout_in_seconds: int | None = None):
|
||||
cmd = ["object-storage", "start"]
|
||||
if timeout_in_seconds is not None:
|
||||
cmd.append(f"--start-timeout={timeout_in_seconds}s")
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def object_storage_stop(self, immediate: bool):
|
||||
cmd = ["object-storage", "stop"]
|
||||
if immediate:
|
||||
cmd.extend(["-m", "immediate"])
|
||||
return self.raw_cli(cmd)
|
||||
pass
|
||||
|
||||
def pageserver_start(
|
||||
self,
|
||||
id: int,
|
||||
|
||||
@@ -1023,6 +1023,8 @@ class NeonEnvBuilder:
|
||||
|
||||
self.env.broker.assert_no_errors()
|
||||
|
||||
self.env.object_storage.assert_no_errors()
|
||||
|
||||
try:
|
||||
self.overlay_cleanup_teardown()
|
||||
except Exception as e:
|
||||
@@ -1118,6 +1120,8 @@ class NeonEnv:
|
||||
pagectl_env_vars["RUST_LOG"] = self.rust_log_override
|
||||
self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath)
|
||||
|
||||
self.object_storage = ObjectStorage(self)
|
||||
|
||||
# The URL for the pageserver to use as its control_plane_api config
|
||||
if config.storage_controller_port_override is not None:
|
||||
log.info(
|
||||
@@ -1173,6 +1177,7 @@ class NeonEnv:
|
||||
},
|
||||
"safekeepers": [],
|
||||
"pageservers": [],
|
||||
"object_storage": {"port": self.port_distributor.get_port()},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
}
|
||||
|
||||
@@ -1408,6 +1413,8 @@ class NeonEnv:
|
||||
self.storage_controller.on_safekeeper_deploy(sk_id, body)
|
||||
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
|
||||
|
||||
self.object_storage.start(timeout_in_seconds=timeout_in_seconds)
|
||||
|
||||
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
|
||||
"""
|
||||
After this method returns, there should be no child processes running.
|
||||
@@ -1425,6 +1432,8 @@ class NeonEnv:
|
||||
except Exception as e:
|
||||
raise_later = e
|
||||
|
||||
self.object_storage.stop(immediate=immediate)
|
||||
|
||||
# Stop storage controller before pageservers: we don't want it to spuriously
|
||||
# detect a pageserver "failure" during test teardown
|
||||
self.storage_controller.stop(immediate=immediate)
|
||||
@@ -2635,6 +2644,26 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.stop(immediate=True)
|
||||
|
||||
|
||||
class ObjectStorage(LogUtils):
|
||||
def __init__(self, env: NeonEnv):
|
||||
service_dir = env.repo_dir / "object_storage"
|
||||
super().__init__(logfile=service_dir / "object_storage.log")
|
||||
self.conf_path = service_dir / "object_storage.json"
|
||||
self.env = env
|
||||
|
||||
def base_url(self):
|
||||
return json.loads(self.conf_path.read_text())["listen"]
|
||||
|
||||
def start(self, timeout_in_seconds: int | None = None):
|
||||
self.env.neon_cli.object_storage_start(timeout_in_seconds)
|
||||
|
||||
def stop(self, immediate: bool = False):
|
||||
self.env.neon_cli.object_storage_stop(immediate)
|
||||
|
||||
def assert_no_errors(self):
|
||||
assert_no_errors(self.logfile, "object_storage", [])
|
||||
|
||||
|
||||
class NeonProxiedStorageController(NeonStorageController):
|
||||
def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool, use_https: bool):
|
||||
super().__init__(env, proxy_port, auth_enabled, use_https)
|
||||
|
||||
@@ -134,10 +134,11 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Stop default ps/sk
|
||||
# Stop default services
|
||||
env.neon_cli.pageserver_stop(env.pageserver.id)
|
||||
env.neon_cli.safekeeper_stop()
|
||||
env.neon_cli.storage_controller_stop(False)
|
||||
env.neon_cli.object_storage_stop(False)
|
||||
env.neon_cli.storage_broker_stop()
|
||||
|
||||
# Keep NeonEnv state up to date, it usually owns starting/stopping services
|
||||
@@ -179,11 +180,13 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Using the single-pageserver shortcut property throws when there are multiple pageservers
|
||||
with pytest.raises(AssertionError):
|
||||
_drop = env.pageserver
|
||||
_ = env.pageserver
|
||||
|
||||
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1)
|
||||
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
|
||||
|
||||
env.neon_cli.object_storage_stop(False)
|
||||
|
||||
# Stop this to get out of the way of the following `start`
|
||||
env.neon_cli.storage_controller_stop(False)
|
||||
env.neon_cli.storage_broker_stop()
|
||||
|
||||
56
test_runner/regress/test_object_storage.py
Normal file
56
test_runner/regress/test_object_storage.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from time import time
|
||||
|
||||
import pytest
|
||||
from aiohttp import ClientSession
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from jwcrypto import jwk, jwt
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Inserts, retrieves, and deletes test file using a JWT token
|
||||
"""
|
||||
env = neon_simple_env
|
||||
ep = env.endpoints.create_start(branch_name="main")
|
||||
tenant_id = str(ep.tenant_id)
|
||||
timeline_id = str(ep.show_timeline_id())
|
||||
endpoint_id = ep.endpoint_id
|
||||
|
||||
key_path = env.repo_dir / "auth_private_key.pem"
|
||||
key = jwk.JWK.from_pem(key_path.read_bytes())
|
||||
claims = {
|
||||
"tenant_id": tenant_id,
|
||||
"timeline_id": timeline_id,
|
||||
"endpoint_id": endpoint_id,
|
||||
"exp": round(time()) + 99,
|
||||
}
|
||||
log.info(f"key path {key_path}\nclaims {claims}")
|
||||
token = jwt.JWT(header={"alg": "EdDSA"}, claims=claims)
|
||||
token.make_signed_token(key)
|
||||
token = token.serialize()
|
||||
|
||||
base_url = env.object_storage.base_url()
|
||||
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
log.info(f"cache key url {key}")
|
||||
log.info(f"token {token}")
|
||||
|
||||
async with ClientSession(headers=headers) as session:
|
||||
async with session.get(key) as res:
|
||||
assert res.status == 404, f"Non-existing file is present: {res}"
|
||||
|
||||
data = b"cheburash"
|
||||
async with session.put(key, data=data) as res:
|
||||
assert res.status == 200, f"Error writing file: {res}"
|
||||
|
||||
async with session.get(key) as res:
|
||||
read_data = await res.read()
|
||||
assert data == read_data
|
||||
|
||||
async with session.delete(key) as res:
|
||||
assert res.status == 200, f"Error removing file {res}"
|
||||
|
||||
async with session.get(key) as res:
|
||||
assert res.status == 404, f"File was not deleted: {res}"
|
||||
@@ -95,6 +95,7 @@ def test_storage_controller_smoke(
|
||||
env.pageservers[1].start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
env.object_storage.start()
|
||||
|
||||
# The pageservers we started should have registered with the sharding service on startup
|
||||
nodes = env.storage_controller.node_list()
|
||||
@@ -346,6 +347,7 @@ def prepare_onboarding_env(
|
||||
env = neon_env_builder.init_configs()
|
||||
env.broker.start()
|
||||
env.storage_controller.start()
|
||||
env.object_storage.start()
|
||||
|
||||
# This is the pageserver where we'll initially create the tenant. Run it in emergency
|
||||
# mode so that it doesn't talk to storage controller, and do not register it.
|
||||
|
||||
Reference in New Issue
Block a user