mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
21 Commits
proxy-cpla
...
pg-extensi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34ac170904 | ||
|
|
86d7cb674d | ||
|
|
7d3a7091df | ||
|
|
d6848d53eb | ||
|
|
4b21d23785 | ||
|
|
30815582a7 | ||
|
|
0c515ac034 | ||
|
|
fa6472e2a1 | ||
|
|
fc35a19ede | ||
|
|
79459e8c0a | ||
|
|
5df798c454 | ||
|
|
eebe9c513f | ||
|
|
6990102bb2 | ||
|
|
a5e8e38bc5 | ||
|
|
77217a473d | ||
|
|
6f0246372a | ||
|
|
77aa65f2f2 | ||
|
|
38bed024f2 | ||
|
|
40089beac5 | ||
|
|
bf033294b1 | ||
|
|
fb6a942665 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,5 +1,6 @@
|
||||
/pg_install
|
||||
/target
|
||||
/alek_ext/target
|
||||
/tmp_check
|
||||
/tmp_check_cli
|
||||
__pycache__/
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -37,6 +37,10 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "alek_ext"
|
||||
version = "0.1.0"
|
||||
|
||||
[[package]]
|
||||
name = "android_system_properties"
|
||||
version = "0.1.5"
|
||||
@@ -924,12 +928,14 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"postgres",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
[workspace]
|
||||
members = [
|
||||
"alek_ext",
|
||||
"compute_tools",
|
||||
"control_plane",
|
||||
"pageserver",
|
||||
|
||||
3210
alek_ext/Cargo.lock
generated
Normal file
3210
alek_ext/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
21
alek_ext/Cargo.toml
Normal file
21
alek_ext/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "alek_ext"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.71"
|
||||
aws-config = { version = "0.55", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "0.27"
|
||||
aws-smithy-http = "0.55"
|
||||
aws-credential-types = "0.55"
|
||||
aws-types = "0.55"
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
tokio = "1.28.2"
|
||||
toml_edit = "0.19.10"
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = "0.3.17"
|
||||
|
||||
[workspace]
|
||||
6
alek_ext/fuzzystrmatch.control
Normal file
6
alek_ext/fuzzystrmatch.control
Normal file
@@ -0,0 +1,6 @@
|
||||
# fuzzystrmatch extension
|
||||
comment = 'determine similarities and distance between strings'
|
||||
default_version = '1.2'
|
||||
module_pathname = '$libdir/fuzzystrmatch'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
5
alek_ext/pg_cron.control
Normal file
5
alek_ext/pg_cron.control
Normal file
@@ -0,0 +1,5 @@
|
||||
comment = 'Job scheduler for PostgreSQL'
|
||||
default_version = '1.5'
|
||||
module_pathname = '$libdir/pg_cron'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
33
alek_ext/src/awsmwe_v1.rs
Normal file
33
alek_ext/src/awsmwe_v1.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
/*
|
||||
* This is a MWE of using the aws-sdk-s3 to download a file from an S3 bucket
|
||||
* */
|
||||
|
||||
use aws_sdk_s3::{self, config::Region, Error};
|
||||
use aws_config::{self, meta::region::RegionProviderChain};
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
let region_provider = RegionProviderChain::first_try(Region::new("eu-central-1"))
|
||||
.or_default_provider()
|
||||
.or_else(Region::new("eu-central-1"));
|
||||
|
||||
let shared_config = aws_config::from_env().region(region_provider).load().await;
|
||||
let client = aws_sdk_s3::Client::new(&shared_config);
|
||||
|
||||
let bucket_name = "neon-dev-extensions";
|
||||
let object_key = "fuzzystrmatch.control";
|
||||
let response = client
|
||||
.get_object()
|
||||
.bucket(bucket_name)
|
||||
.key(object_key)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let stuff = response.body;
|
||||
let data = stuff.collect().await.expect("error reading data").to_vec();
|
||||
println!("data: {:?}", std::str::from_utf8(&data));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
52
alek_ext/src/download_with_remote_api_v2.rs
Normal file
52
alek_ext/src/download_with_remote_api_v2.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
/* This is a MWE of using our RemoteStorage API to call the aws stuff and download a file
|
||||
*
|
||||
*/
|
||||
|
||||
use remote_storage::*;
|
||||
use std::path::Path;
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use toml_edit;
|
||||
use anyhow;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let from_path = "fuzzystrmatch.control";
|
||||
let remote_from_path = RemotePath::new(Path::new(from_path))?;
|
||||
println!("{:?}", remote_from_path.clone());
|
||||
|
||||
// read configurations from `pageserver.toml`
|
||||
let cfg_file_path = Path::new("./../.neon/pageserver.toml");
|
||||
let cfg_file_contents = std::fs::read_to_string(cfg_file_path).unwrap();
|
||||
let toml = cfg_file_contents
|
||||
.parse::<toml_edit::Document>()
|
||||
.expect("Error parsing toml");
|
||||
let remote_storage_data = toml.get("remote_storage")
|
||||
.expect("field should be present");
|
||||
let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)
|
||||
.expect("error parsing toml")
|
||||
.expect("error parsing toml");
|
||||
|
||||
// query S3 bucket
|
||||
let remote_storage = GenericRemoteStorage::from_config(&remote_storage_config)?;
|
||||
let from_path = "fuzzystrmatch.control";
|
||||
let remote_from_path = RemotePath::new(Path::new(from_path))?;
|
||||
|
||||
println!("{:?}", remote_from_path.clone());
|
||||
// if let GenericRemoteStorage::AwsS3(mybucket) = remote_storage {
|
||||
// println!("{:?}",mybucket.relative_path_to_s3_object(&remote_from_path));
|
||||
// }
|
||||
let mut data = remote_storage.download(&remote_from_path).await.expect("data yay");
|
||||
let mut write_data_buffer = Vec::new();
|
||||
data.download_stream.read_to_end(&mut write_data_buffer).await?;
|
||||
let f = File::create("alek.out").expect("problem creating file");
|
||||
let mut f = BufWriter::new(f);
|
||||
f.write_all(&mut write_data_buffer).expect("error writing data");
|
||||
|
||||
// let stuff = response.body;
|
||||
// let data = stuff.collect().await.expect("error reading data").to_vec();
|
||||
// println!("data: {:?}", std::str::from_utf8(&data));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
53
alek_ext/src/localfiledownload_v0.rs
Normal file
53
alek_ext/src/localfiledownload_v0.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* This is a MWE of "downloading" a local file from a fake local bucket
|
||||
* */
|
||||
|
||||
use remote_storage::*;
|
||||
use std::path::Path;
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use toml_edit;
|
||||
use anyhow;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
async fn download_file() -> anyhow::Result<()> {
|
||||
// read configurations from `pageserver.toml`
|
||||
let cfg_file_path = Path::new("./../.neon/pageserver.toml");
|
||||
let cfg_file_contents = std::fs::read_to_string(cfg_file_path).unwrap();
|
||||
let toml = cfg_file_contents
|
||||
.parse::<toml_edit::Document>()
|
||||
.expect("Error parsing toml");
|
||||
let remote_storage_data = toml.get("remote_storage")
|
||||
.expect("field should be present");
|
||||
let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)
|
||||
.expect("error parsing toml")
|
||||
.expect("error parsing toml");
|
||||
|
||||
// query S3 bucket
|
||||
let remote_storage = GenericRemoteStorage::from_config(&remote_storage_config)?;
|
||||
let from_path = "neon-dev-extensions/fuzzystrmatch.control";
|
||||
let remote_from_path = RemotePath::new(Path::new(from_path))?;
|
||||
|
||||
println!("im fine");
|
||||
println!("{:?}",remote_storage_config);
|
||||
|
||||
let mut data = remote_storage.download(&remote_from_path).await.expect("data yay");
|
||||
let mut write_data_buffer = Vec::new();
|
||||
|
||||
data.download_stream.read_to_end(&mut write_data_buffer).await?;
|
||||
|
||||
// write `data` to a file locally
|
||||
let f = File::create("alek.out").expect("problem creating file");
|
||||
let mut f = BufWriter::new(f);
|
||||
f.write_all(&mut write_data_buffer).expect("error writing data");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
match download_file().await {
|
||||
Err(_)=>println!("Err"),
|
||||
_ => println!("SUCEECESS")
|
||||
}
|
||||
}
|
||||
53
alek_ext/src/main.rs
Normal file
53
alek_ext/src/main.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* This is a MWE of using the RemoteStorage API to list and download files from aws
|
||||
*/
|
||||
macro_rules! alek { ($expression:expr) => { println!("{:?}", $expression); }; }
|
||||
|
||||
use remote_storage::*;
|
||||
use std::path::Path;
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use toml_edit;
|
||||
use anyhow::{self, Context};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::*;
|
||||
use tracing_subscriber;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
// TODO: read configs from a different place!
|
||||
let cfg_file_path = Path::new("./../.neon/pageserver.toml");
|
||||
let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
|
||||
.with_context(|| format!( "Failed to read pageserver config at '{}'", cfg_file_path.display()))?;
|
||||
let toml = cfg_file_contents
|
||||
.parse::<toml_edit::Document>()
|
||||
.with_context(|| format!( "Failed to parse '{}' as pageserver config", cfg_file_path.display()))?;
|
||||
let remote_storage_data = toml.get("remote_storage")
|
||||
.context("field should be present")?;
|
||||
let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)?
|
||||
.context("error configuring remote storage")?;
|
||||
let remote_storage = GenericRemoteStorage::from_config(&remote_storage_config)?;
|
||||
|
||||
let folder = RemotePath::new(Path::new("public_extensions"))?;
|
||||
// lists all the files in the public_extensions folder
|
||||
let from_paths = remote_storage.list_files(Some(&folder)).await?;
|
||||
alek!(from_paths);
|
||||
for remote_from_path in from_paths {
|
||||
if remote_from_path.extension() == Some("control") {
|
||||
let file_name = remote_from_path.object_name().expect("it must exist");
|
||||
info!("{:?}", file_name);
|
||||
alek!(&remote_from_path);
|
||||
// download the file
|
||||
let mut download = remote_storage.download(&remote_from_path).await?;
|
||||
// write the file to a local location
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download.download_stream.read_to_end(&mut write_data_buffer).await?;
|
||||
let mut output_file = BufWriter::new(File::create(file_name)?);
|
||||
output_file.write_all(&mut write_data_buffer)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
65
alek_ext/src/old_multiple_files.rs
Normal file
65
alek_ext/src/old_multiple_files.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
/*
|
||||
**WIP**
|
||||
* This is a MWE of using our RemoteStorage API to call the aws stuff and download multiple files
|
||||
*/
|
||||
|
||||
#![allow(unused_imports)]
|
||||
use remote_storage::*;
|
||||
use std::path::Path;
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use toml_edit;
|
||||
use anyhow::{self, Context};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
/* me trying to hack RemotePath into submission */
|
||||
|
||||
let cfg_file_path = Path::new("./../.neon/pageserver.toml");
|
||||
let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
|
||||
.expect("couldn't find pageserver.toml; make sure you are in neon/alek_ext");
|
||||
let toml = cfg_file_contents
|
||||
.parse::<toml_edit::Document>()
|
||||
.expect("Error parsing toml");
|
||||
let remote_storage_data = toml.get("remote_storage")
|
||||
.expect("field should be present");
|
||||
let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)
|
||||
.expect("error parsing toml")
|
||||
.expect("error parsing toml");
|
||||
let remote_storage = GenericRemoteStorage::from_config(&remote_storage_config)?;
|
||||
|
||||
if let GenericRemoteStorage::AwsS3(mybucket) = remote_storage {
|
||||
let resp = mybucket
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket("neon-dev-extensions")
|
||||
.set_prefix(Some("public_extensions".to_string()))
|
||||
.delimiter("/".to_string())
|
||||
.send().await?;
|
||||
|
||||
let z = resp.common_prefixes.unwrap();
|
||||
for yy in z {
|
||||
println!("plzplz: {:?}",yy);
|
||||
}
|
||||
let mut i = 0;
|
||||
for remote_from_path in from_paths {
|
||||
i += 1;
|
||||
println!("{:?}", &remote_from_path);
|
||||
if remote_from_path.extension() == Some("control") {
|
||||
let mut data = remote_storage.download(&remote_from_path).await?;
|
||||
// write `data` to a file locally
|
||||
// TODO: I think that the way I'm doing this is not optimal;
|
||||
// It should be possible to write the data directly to a file
|
||||
// rather than first writing it to a vector...
|
||||
let mut write_data_buffer = Vec::new();
|
||||
data.download_stream.read_to_end(&mut write_data_buffer).await?;
|
||||
let f = File::create("alek{i}.out").expect("problem creating file");
|
||||
let mut f = BufWriter::new(f);
|
||||
f.write_all(&mut write_data_buffer).expect("error writing data");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -30,3 +30,5 @@ url.workspace = true
|
||||
compute_api.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
toml_edit.workspace = true
|
||||
|
||||
@@ -53,11 +53,20 @@ use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
use compute_tools::extensions::*;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let matches = cli().get_matches();
|
||||
let config = get_s3_config(&matches)
|
||||
.expect("Hopefully get_s3_config works");
|
||||
download_extension(&config, ExtensionType::Shared)
|
||||
.await
|
||||
.expect("Assume downloads can't error.");
|
||||
// let mut file = File::create("alek.txt")?;
|
||||
// file.write_all(b"success?")?;
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
@@ -202,6 +211,9 @@ fn main() -> Result<()> {
|
||||
// We got all we need, update the state.
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
|
||||
// Now we have the spec, and also the tenant id, so we can download the user's personal extensions
|
||||
// download_extension(&config, ExtensionType::Tenant(FIXME tenant_id.into()));
|
||||
|
||||
// Record for how long we slept waiting for the spec.
|
||||
state.metrics.wait_for_spec_ms = Utc::now()
|
||||
.signed_duration_since(state.start_time)
|
||||
@@ -221,6 +233,9 @@ fn main() -> Result<()> {
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
// Now we are ready to download library extensions
|
||||
// download_extension(&config, ExtensionType::Library(FIXME library_name.into()));
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
|
||||
96
compute_tools/src/extensions.rs
Normal file
96
compute_tools/src/extensions.rs
Normal file
@@ -0,0 +1,96 @@
|
||||
// This is some code for downloading postgres extensions from AWS s3
|
||||
use std::path::Path;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use clap::{ArgMatches};
|
||||
use toml_edit;
|
||||
use remote_storage::*;
|
||||
|
||||
fn get_pg_config(argument: &str) -> String {
|
||||
// FIXME: this function panics if it runs into any issues
|
||||
let config_output = std::process::Command::new("pg_config")
|
||||
.arg(argument)
|
||||
.output()
|
||||
.expect("pg_config should be installed");
|
||||
assert!(config_output.status.success());
|
||||
let stdout = std::str::from_utf8(&config_output.stdout).unwrap();
|
||||
stdout.trim().to_string()
|
||||
}
|
||||
|
||||
fn download_helper(remote_storage: &GenericRemoteStorage, remote_from_path: &RemotePath, to_path: &str) -> anyhow::Result<()> {
|
||||
let file_name = remote_from_path.object_name().expect("it must exist");
|
||||
info!("Downloading {:?}",file_name);
|
||||
let mut download = remote_storage.download(&remote_from_path).await?;
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download.download_stream.read_to_end(&mut write_data_buffer).await?;
|
||||
let mut output_file = BufWriter::new(File::create(file_name)?);
|
||||
output_file.write_all(&mut write_data_buffer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub enum ExtensionType {
|
||||
Shared,
|
||||
Tenant(String),
|
||||
Library(String)
|
||||
}
|
||||
|
||||
pub async fn download_extension(config: &RemoteStorageConfig, ext_type: ExtensionType) -> anyhow::Result<()>{
|
||||
let sharedir = get_pg_config("--sharedir");
|
||||
let sharedir = format!("{}/extension", sharedir);
|
||||
let libdir = get_pg_config("--libdir");
|
||||
let remote_storage = GenericRemoteStorage::from_config(config)?;
|
||||
|
||||
match ext_type {
|
||||
ExtensionType::Shared => {
|
||||
// 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension
|
||||
// We can do this step even before we have spec,
|
||||
// because public extensions are common for all projects.
|
||||
let folder = RemotePath::new(Path::new("public_extensions"))?;
|
||||
let from_paths = remote_storage.list_files(Some(&folder)).await?;
|
||||
for remote_from_path in from_paths {
|
||||
if remote_from_path.extension() == Some("control") {
|
||||
// FIXME: CAUTION: if you run this, it will actually write stuff to my postgress directory
|
||||
// but atm that stuff that it is going to write is not good.
|
||||
// don't run atm without changing path
|
||||
download_helper(&remote_storage, &remote_from_path, &sharedir)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
ExtensionType::Tenant(tenant_id) => {
|
||||
// 2. After we have spec, before project start
|
||||
// Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension
|
||||
let folder = RemotePath::new(Path::new(format!("{tenant_id}")))?;
|
||||
let from_paths = remote_storage.list_files(Some(&folder)).await?;
|
||||
for remote_from_path in from_paths {
|
||||
if remote_from_path.extension() == Some("control") {
|
||||
download_helper(&remote_storage, &remote_from_path, &sharedir)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
ExtensionType::Library(library_name) => {
|
||||
// 3. After we have spec, before postgres start
|
||||
// Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/
|
||||
let from_path = format!("neon-dev-extensions/public/{library_name}.control");
|
||||
let remote_from_path = RemotePath::new(Path::new(&from_path))?;
|
||||
download_helper(&remote_storage, &remote_from_path, &libdir)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_s3_config(arg_matches: &ArgMatches) -> anyhow::Result<RemoteStorageConfig> {
|
||||
// TODO: Right now we are using the same config parameters as pageserver; but should we have our own configs?
|
||||
// TODO: Should we read the s3_config from CLI arguments?
|
||||
let cfg_file_path = Path::new("./../.neon/pageserver.toml");
|
||||
let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
|
||||
.with_context(|| format!( "Failed to read pageserver config at '{}'", cfg_file_path.display()))?;
|
||||
let toml = cfg_file_contents
|
||||
.parse::<toml_edit::Document>()
|
||||
.with_context(|| format!( "Failed to parse '{}' as pageserver config", cfg_file_path.display()))?;
|
||||
let remote_storage_data = toml.get("remote_storage")
|
||||
.context("field should be present")?;
|
||||
let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)?
|
||||
.context("error configuring remote storage")?;
|
||||
Ok(remote_storage_config)
|
||||
}
|
||||
|
||||
@@ -13,3 +13,4 @@ pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
pub mod spec;
|
||||
pub mod extensions;
|
||||
|
||||
5
fakes3/public_extensions/fuzzystrmatch.control
Normal file
5
fakes3/public_extensions/fuzzystrmatch.control
Normal file
@@ -0,0 +1,5 @@
|
||||
this
|
||||
Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
|
||||
|
||||
lol
|
||||
|
||||
7
fakes3/public_extensions/pg_cron.control
Normal file
7
fakes3/public_extensions/pg_cron.control
Normal file
@@ -0,0 +1,7 @@
|
||||
this
|
||||
ax mod p < p/2
|
||||
|
||||
its gonna be big!
|
||||
coming soon at 4pm
|
||||
lol
|
||||
|
||||
3
fakes3/tenant123/test.control
Normal file
3
fakes3/tenant123/test.control
Normal file
@@ -0,0 +1,3 @@
|
||||
|
||||
Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
|
||||
|
||||
@@ -70,6 +70,10 @@ impl RemotePath {
|
||||
pub fn join(&self, segment: &Path) -> Self {
|
||||
Self(self.0.join(segment))
|
||||
}
|
||||
|
||||
pub fn extension(&self) -> Option<&str> {
|
||||
self.0.extension()?.to_str()
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -86,6 +90,12 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
prefix: Option<&RemotePath>,
|
||||
) -> Result<Vec<RemotePath>, DownloadError>;
|
||||
|
||||
/// Lists all files in a subdirectories
|
||||
async fn list_files(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
) -> anyhow::Result<Vec<RemotePath>>;
|
||||
|
||||
/// Streams the local file contents into remote into the remote storage entry.
|
||||
async fn upload(
|
||||
&self,
|
||||
@@ -161,6 +171,23 @@ pub enum GenericRemoteStorage {
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
// A function for listing all the files in a "directory"
|
||||
// Example:
|
||||
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
||||
pub async fn list_files(
|
||||
&self,
|
||||
folder: Option<&RemotePath>
|
||||
) -> anyhow::Result<Vec<RemotePath>>{
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_files(folder).await,
|
||||
Self::AwsS3(s) => s.list_files(folder).await,
|
||||
Self::Unreliable(s) => s.list_files(folder).await,
|
||||
}
|
||||
}
|
||||
|
||||
// lists common *prefixes*, if any of files
|
||||
// Example:
|
||||
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
|
||||
pub async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
|
||||
@@ -116,6 +116,34 @@ impl RemoteStorage for LocalFs {
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn list_files(
|
||||
&self,
|
||||
folder: Option<&RemotePath>
|
||||
) -> anyhow::Result<Vec<RemotePath>> {
|
||||
/* Note: if you want, you can return a DownloadError instead of an anyhow::Error
|
||||
as follows: replace all ?'s with:
|
||||
.map_err(|e| DownloadError::Other(anyhow::Error::from(e)))?;
|
||||
*/
|
||||
let full_path = match folder.clone() {
|
||||
Some(folder) => folder.with_base(&self.storage_root),
|
||||
None => self.storage_root.clone(),
|
||||
};
|
||||
let mut entries = fs::read_dir(full_path).await?;
|
||||
let mut files = vec![];
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let file_name: PathBuf = entry.file_name().into();
|
||||
let file_type = entry.file_type().await?;
|
||||
if file_type.is_file() {
|
||||
let mut file_remote_path = RemotePath::new(&file_name)?;
|
||||
if let Some(folder) = folder {
|
||||
file_remote_path = folder.join(&file_name);
|
||||
}
|
||||
files.push(file_remote_path);
|
||||
}
|
||||
}
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
@@ -332,6 +332,51 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(document_keys)
|
||||
}
|
||||
|
||||
async fn list_files(
|
||||
&self,
|
||||
folder: Option<&RemotePath>
|
||||
) -> anyhow::Result<Vec<RemotePath>>{
|
||||
let folder_name = folder.map(|x|
|
||||
String::from(x.object_name().expect("invalid folder name"))
|
||||
);
|
||||
let mut continuation_token = None;
|
||||
let mut all_files = vec![];
|
||||
loop {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 list_files")?;
|
||||
metrics::inc_list_objects();
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(folder_name.clone())
|
||||
.set_continuation_token(continuation_token)
|
||||
.set_max_keys(self.max_keys_per_list_response)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_list_objects_fail();
|
||||
e
|
||||
})
|
||||
.context("Failed to list files in S3 bucket")?;
|
||||
|
||||
for object in response.contents().unwrap_or_default() {
|
||||
let object_path = object.key().unwrap();
|
||||
let remote_path = self.s3_object_to_relative_path(object_path);
|
||||
all_files.push(remote_path);
|
||||
}
|
||||
match response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
@@ -82,6 +82,14 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.list_prefixes(prefix).await
|
||||
}
|
||||
|
||||
async fn list_files(
|
||||
&self,
|
||||
folder: Option<&RemotePath>
|
||||
) -> anyhow::Result<Vec<RemotePath>>{
|
||||
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
|
||||
self.inner.list_files(folder).await
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
Reference in New Issue
Block a user