This commit is contained in:
Alek Westover
2023-06-15 10:20:01 -04:00
parent 7465c644b9
commit 214ecacfc4
8 changed files with 60 additions and 27 deletions

2
Cargo.lock generated
View File

@@ -924,12 +924,14 @@ dependencies = [
"opentelemetry",
"postgres",
"regex",
"remote_storage",
"reqwest",
"serde",
"serde_json",
"tar",
"tokio",
"tokio-postgres",
"toml_edit",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",

View File

@@ -30,3 +30,5 @@ url.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true
toml_edit.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }

View File

@@ -1,10 +1,11 @@
use remote_storage::*;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::str;
use std::{fs, thread};
use toml_edit;
use remote_storage::*;
use BuffWriter;
use anyhow::Context;
use tracing::info;
@@ -12,6 +13,9 @@ use tracing::info;
pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> {
let mut buf = [0; 512];
println!("ALEK: calling download file");
// fs.write("world.txt", "hello")?;
stream.read(&mut buf).expect("Error reading from stream");
let filename = str::from_utf8(&buf)
@@ -20,7 +24,7 @@ pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> {
println!("requested file {}", filename);
download_extension(get_s3_config(), ExtensionType::Shared);
// download_extension(get_s3_config(), ExtensionType::Shared);
let from_prefix = "/tmp/from_prefix";
let to_prefix = "/tmp/to_prefix";
@@ -48,24 +52,41 @@ fn get_pg_config(argument: &str) -> String {
stdout.trim().to_string()
}
fn download_helper(remote_storage: &GenericRemoteStorage, remote_from_path: &RemotePath, to_path: &str) -> anyhow::Result<()> {
async fn download_helper(
remote_storage: &GenericRemoteStorage,
remote_from_path: &RemotePath,
to_path: &str,
) -> anyhow::Result<()> {
let file_name = remote_from_path.object_name().expect("it must exist");
info!("Downloading {:?}",file_name);
info!("Downloading {:?}", file_name);
let mut download = remote_storage.download(&remote_from_path).await?;
let mut write_data_buffer = Vec::new();
download.download_stream.read_to_end(&mut write_data_buffer).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.await?;
let mut output_file = BufWriter::new(File::create(file_name)?);
output_file.write_all(&mut write_data_buffer)?;
Ok(())
}
pub enum ExtensionType {
Shared, // we just use the public folder here
Tenant(String), // String is tenant_id
Library(String) // String is name of the extension
Shared, // we just use the public folder here
Tenant(String), // String is tenant_id
Library(String), // String is name of the extension
}
pub async fn download_extension(config: &RemoteStorageConfig, ext_type: ExtensionType) -> anyhow::Result<()>{
/*
separate stroage and compute
storage: pageserver stores pages, accepts WAL. communicates with S3.
compute: postgres, runs in kubernetes/ VM, started by compute_ctl. rust service. accepts some spec.
pass config to compute_ctl
*/
pub async fn download_extension(
config: &RemoteStorageConfig,
ext_type: ExtensionType,
) -> anyhow::Result<()> {
let sharedir = get_pg_config("--sharedir");
let sharedir = format!("{}/extension", sharedir);
let libdir = get_pg_config("--libdir");
@@ -80,8 +101,8 @@ pub async fn download_extension(config: &RemoteStorageConfig, ext_type: Extensio
let from_paths = remote_storage.list_files(Some(&folder)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
// FIXME: CAUTION: if you run this, it will actually write stuff to your postgress directory
// only run if you are ok with that
// NOTE: if you run this, it will actually write stuff to your postgress directory
// only run if you are ok with that. TODO: delete this comment
download_helper(&remote_storage, &remote_from_path, &sharedir)?;
}
}
@@ -112,15 +133,24 @@ pub fn get_s3_config() -> anyhow::Result<RemoteStorageConfig> {
// TODO: Right now we are using the same config parameters as pageserver; but should we have our own configs?
// TODO: Should we read the s3_config from CLI arguments?
let cfg_file_path = Path::new("./../.neon/pageserver.toml");
let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
.with_context(|| format!( "Failed to read pageserver config at '{}'", cfg_file_path.display()))?;
let cfg_file_contents = std::fs::read_to_string(cfg_file_path).with_context(|| {
format!(
"Failed to read pageserver config at '{}'",
cfg_file_path.display()
)
})?;
let toml = cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| format!( "Failed to parse '{}' as pageserver config", cfg_file_path.display()))?;
let remote_storage_data = toml.get("remote_storage")
.with_context(|| {
format!(
"Failed to parse '{}' as pageserver config",
cfg_file_path.display()
)
})?;
let remote_storage_data = toml
.get("remote_storage")
.context("field should be present")?;
let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)?
.context("error configuring remote storage")?;
Ok(remote_storage_config)
}

View File

@@ -17,6 +17,8 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
use crate::extension_server;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
start_time: state.start_time,
@@ -133,7 +135,8 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
filename
);
match download_file(&filename).await {
// TODO : left off here...
match extension_server::download_file(&filename).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("download_file failed: {}", e);

View File

@@ -9,6 +9,7 @@ pub mod http;
#[macro_use]
pub mod logger;
pub mod compute;
pub mod extension_server;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -90,10 +90,10 @@ pub trait RemoteStorage: Send + Sync + 'static {
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError>;
/// Lists all files in a subdirectories
/// Lists all files in a folder
async fn list_files(
&self,
prefix: Option<&RemotePath>,
folder: Option<&RemotePath>,
) -> anyhow::Result<Vec<RemotePath>>;
/// Streams the local file contents into remote into the remote storage entry.

View File

@@ -336,7 +336,6 @@ impl RemoteStorage for S3Bucket {
&self,
folder: Option<&RemotePath>
) -> anyhow::Result<Vec<RemotePath>>{
// TODO: should we use DownloadError error type instead of anyhow::Error?
let folder_name = folder.map(|x|
String::from(x.object_name().expect("invalid folder name"))
);
@@ -348,7 +347,6 @@ impl RemoteStorage for S3Bucket {
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 list_files")?;
metrics::inc_list_objects();
let response = self
.client
@@ -359,15 +357,10 @@ impl RemoteStorage for S3Bucket {
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
e
})
.context("Failed to list files in S3 bucket")?;
for object in response.contents().unwrap_or_default() {
let object_path = object.key().unwrap();
println!("{:?}", object_path);
let remote_path = self.s3_object_to_relative_path(object_path);
all_files.push(remote_path);
}

View File

@@ -44,6 +44,8 @@ def test_file_download(
env.neon_cli.create_timeline("test_file_download", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_file_download", tenant_id=tenant)
# download the control file from MockS3
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION test_load");