alek: add download files changes

This commit is contained in:
Alek Westover
2023-06-14 10:50:25 -04:00
parent 8013f9630d
commit bb931f2ce0
5 changed files with 201 additions and 0 deletions

View File

@@ -3,6 +3,8 @@ use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::str;
use std::{fs, thread};
use toml_edit;
use remote_storage::*;
use anyhow::Context;
use tracing::info;
@@ -18,6 +20,7 @@ pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> {
println!("requested file {}", filename);
download_extension(get_s3_config(), ExtensionType::Shared);
let from_prefix = "/tmp/from_prefix";
let to_prefix = "/tmp/to_prefix";
@@ -33,3 +36,91 @@ pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> {
Ok(())
}
fn get_pg_config(argument: &str) -> String {
// FIXME: this function panics if it runs into any issues
let config_output = std::process::Command::new("pg_config")
.arg(argument)
.output()
.expect("pg_config should be installed");
assert!(config_output.status.success());
let stdout = std::str::from_utf8(&config_output.stdout).unwrap();
stdout.trim().to_string()
}
fn download_helper(remote_storage: &GenericRemoteStorage, remote_from_path: &RemotePath, to_path: &str) -> anyhow::Result<()> {
let file_name = remote_from_path.object_name().expect("it must exist");
info!("Downloading {:?}",file_name);
let mut download = remote_storage.download(&remote_from_path).await?;
let mut write_data_buffer = Vec::new();
download.download_stream.read_to_end(&mut write_data_buffer).await?;
let mut output_file = BufWriter::new(File::create(file_name)?);
output_file.write_all(&mut write_data_buffer)?;
Ok(())
}
pub enum ExtensionType {
Shared, // we just use the public folder here
Tenant(String), // String is tenant_id
Library(String) // String is name of the extension
}
pub async fn download_extension(config: &RemoteStorageConfig, ext_type: ExtensionType) -> anyhow::Result<()>{
let sharedir = get_pg_config("--sharedir");
let sharedir = format!("{}/extension", sharedir);
let libdir = get_pg_config("--libdir");
let remote_storage = GenericRemoteStorage::from_config(config)?;
match ext_type {
ExtensionType::Shared => {
// 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension
// We can do this step even before we have spec,
// because public extensions are common for all projects.
let folder = RemotePath::new(Path::new("public_extensions"))?;
let from_paths = remote_storage.list_files(Some(&folder)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
// FIXME: CAUTION: if you run this, it will actually write stuff to your postgress directory
// only run if you are ok with that
download_helper(&remote_storage, &remote_from_path, &sharedir)?;
}
}
}
ExtensionType::Tenant(tenant_id) => {
// 2. After we have spec, before project start
// Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension
let folder = RemotePath::new(Path::new(format!("{tenant_id}")))?;
let from_paths = remote_storage.list_files(Some(&folder)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(&remote_storage, &remote_from_path, &sharedir)?;
}
}
}
ExtensionType::Library(library_name) => {
// 3. After we have spec, before postgres start
// Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/
let from_path = format!("neon-dev-extensions/public/{library_name}.control");
let remote_from_path = RemotePath::new(Path::new(&from_path))?;
download_helper(&remote_storage, &remote_from_path, &libdir)?;
}
}
Ok(())
}
pub fn get_s3_config() -> anyhow::Result<RemoteStorageConfig> {
// TODO: Right now we are using the same config parameters as pageserver; but should we have our own configs?
// TODO: Should we read the s3_config from CLI arguments?
let cfg_file_path = Path::new("./../.neon/pageserver.toml");
let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
.with_context(|| format!( "Failed to read pageserver config at '{}'", cfg_file_path.display()))?;
let toml = cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| format!( "Failed to parse '{}' as pageserver config", cfg_file_path.display()))?;
let remote_storage_data = toml.get("remote_storage")
.context("field should be present")?;
let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)?
.context("error configuring remote storage")?;
Ok(remote_storage_config)
}

View File

@@ -70,6 +70,10 @@ impl RemotePath {
pub fn join(&self, segment: &Path) -> Self {
Self(self.0.join(segment))
}
pub fn extension(&self) -> Option<&str> {
self.0.extension()?.to_str()
}
}
/// Storage (potentially remote) API to manage its state.
@@ -86,6 +90,12 @@ pub trait RemoteStorage: Send + Sync + 'static {
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError>;
/// Lists all files in a subdirectories
async fn list_files(
&self,
prefix: Option<&RemotePath>,
) -> anyhow::Result<Vec<RemotePath>>;
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
&self,
@@ -161,6 +171,23 @@ pub enum GenericRemoteStorage {
}
impl GenericRemoteStorage {
// A function for listing all the files in a "directory"
// Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
pub async fn list_files(
&self,
folder: Option<&RemotePath>
) -> anyhow::Result<Vec<RemotePath>>{
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
}
}
// lists common *prefixes*, if any of files
// Example:
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
pub async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,

View File

@@ -116,6 +116,34 @@ impl RemoteStorage for LocalFs {
.collect())
}
async fn list_files(
&self,
folder: Option<&RemotePath>
) -> anyhow::Result<Vec<RemotePath>> {
/* Note: if you want, you can return a DownloadError instead of an anyhow::Error
as follows: replace all ?'s with:
.map_err(|e| DownloadError::Other(anyhow::Error::from(e)))?;
*/
let full_path = match folder.clone() {
Some(folder) => folder.with_base(&self.storage_root),
None => self.storage_root.clone(),
};
let mut entries = fs::read_dir(full_path).await?;
let mut files = vec![];
while let Some(entry) = entries.next_entry().await? {
let file_name: PathBuf = entry.file_name().into();
let file_type = entry.file_type().await?;
if file_type.is_file() {
let mut file_remote_path = RemotePath::new(&file_name)?;
if let Some(folder) = folder {
file_remote_path = folder.join(&file_name);
}
files.push(file_remote_path);
}
}
Ok(files)
}
async fn upload(
&self,
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -332,6 +332,53 @@ impl RemoteStorage for S3Bucket {
Ok(document_keys)
}
async fn list_files(
&self,
folder: Option<&RemotePath>
) -> anyhow::Result<Vec<RemotePath>>{
// TODO: should we use DownloadError error type instead of anyhow::Error?
let folder_name = folder.map(|x|
String::from(x.object_name().expect("invalid folder name"))
);
let mut continuation_token = None;
let mut all_files = vec![];
loop {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list_files")?;
metrics::inc_list_objects();
let response = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(folder_name.clone())
.set_continuation_token(continuation_token)
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})
.context("Failed to list files in S3 bucket")?;
for object in response.contents().unwrap_or_default() {
let object_path = object.key().unwrap();
println!("{:?}", object_path);
let remote_path = self.s3_object_to_relative_path(object_path);
all_files.push(remote_path);
}
match response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(all_files)
}
async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -82,6 +82,14 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list_prefixes(prefix).await
}
async fn list_files(
&self,
folder: Option<&RemotePath>
) -> anyhow::Result<Vec<RemotePath>>{
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
self.inner.list_files(folder).await
}
async fn upload(
&self,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,