refactoring a bit

This commit is contained in:
Alek Westover
2023-06-21 11:32:04 -04:00
parent 4a35f29301
commit 02a1d4d8c1
6 changed files with 35 additions and 62 deletions

View File

@@ -1 +1 @@
[RemotePath("tenants/d18ac7eff5c6c14559041ac7b8a94506/timelines/87ce5cd6c70a22502fbbc408374e437c/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000169B098-000000000169B111"), RemotePath("tenants/d18ac7eff5c6c14559041ac7b8a94506/timelines/87ce5cd6c70a22502fbbc408374e437c/index_part.json"), RemotePath("tenants/d18ac7eff5c6c14559041ac7b8a94506/timelines/ac5a22da4cca24f99a359ea01970d5e6/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000169B098-000000000169B111"), RemotePath("tenants/d18ac7eff5c6c14559041ac7b8a94506/timelines/ac5a22da4cca24f99a359ea01970d5e6/index_part.json"), RemotePath("tenants/d76683c8997d65d455a61c195ead377c/timelines/38134bfd974b3de3b6f8b38726cd944d/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000169B098-000000000169B111"), RemotePath("tenants/d76683c8997d65d455a61c195ead377c/timelines/38134bfd974b3de3b6f8b38726cd944d/index_part.json"), RemotePath("v15/share/extension/test_ext.control")]
[RemotePath("tenants/9a8ce821f7946ed2f2d58f51f2595024/timelines/62cdc8653864e444171faac9a5c3cea9/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000169B098-000000000169B111"), RemotePath("tenants/9a8ce821f7946ed2f2d58f51f2595024/timelines/62cdc8653864e444171faac9a5c3cea9/index_part.json"), RemotePath("tenants/a9982e09ea4a00aff9c61daf12744098/timelines/55efa350b38ff1a9df45726dbaadbb9f/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000169B098-000000000169B111"), RemotePath("tenants/a9982e09ea4a00aff9c61daf12744098/timelines/55efa350b38ff1a9df45726dbaadbb9f/index_part.json"), RemotePath("tenants/a9982e09ea4a00aff9c61daf12744098/timelines/a0af9d76650e3477b7bd1a1e8e2793bf/000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000169B098-000000000169B111"), RemotePath("tenants/a9982e09ea4a00aff9c61daf12744098/timelines/a0af9d76650e3477b7bd1a1e8e2793bf/index_part.json"), RemotePath("v15/share/extension/test_ext.control")]

View File

@@ -27,7 +27,8 @@
//! compute_ctl -D /var/db/postgres/compute \
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -b /usr/local/bin/postgres
//! -b /usr/local/bin/postgres \
//! -r {"bucket": "my-bucket", "region": "eu-central-1"}
//! ```
//!
use std::collections::HashMap;
@@ -41,7 +42,6 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use serde_json::{self, Value};
use tracing::{error, info};
use url::Url;
@@ -49,6 +49,7 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::download_file;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -65,30 +66,12 @@ fn main() -> Result<()> {
let remote_ext_config = matches
.get_one::<String>("remote-ext-config")
.expect("remote-extension-config is required");
let remote_ext_config: serde_json::Value = serde_json::from_str(&remote_ext_config)?;
let remote_ext_bucket = match &remote_ext_config["bucket"] {
Value::String(x) => x,
_ => panic!("oops"),
};
let remote_ext_region = match &remote_ext_config["region"] {
Value::String(x) => x,
_ => panic!("oops"),
};
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
Value::String(x) => x,
_ => panic!("oops"),
};
let rt = Runtime::new().unwrap();
rt.block_on(async move {
compute_tools::extension_server::download_file(
"test_ext.control",
remote_ext_bucket.into(),
remote_ext_region.into(),
remote_ext_endpoint.into(),
)
.await
.expect("download should work");
download_file("test_ext.control", remote_ext_config)
.await
.expect("download should work");
});
let http_port = *matches
@@ -208,9 +191,7 @@ fn main() -> Result<()> {
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
remote_ext_bucket: remote_ext_bucket.clone(), // TODO: pass more configurations?
remote_ext_region: remote_ext_region.clone(),
remote_ext_endpoint: remote_ext_endpoint.clone(),
remote_ext_config: remote_ext_config.clone(),
};
let compute = Arc::new(compute_node);

View File

@@ -46,10 +46,7 @@ pub struct ComputeNode {
/// `Condvar` to allow notifying waiters about state changes.
pub state_changed: Condvar,
// S3 configuration variables:
// TODO: pass more args here?
pub remote_ext_bucket: String,
pub remote_ext_region: String,
pub remote_ext_endpoint: String,
pub remote_ext_config: String,
}
#[derive(Clone, Debug)]

View File

@@ -1,5 +1,6 @@
use anyhow::{self};
use remote_storage::*;
use serde_json::{self, Value};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::num::{NonZeroU32, NonZeroUsize};
@@ -8,14 +9,9 @@ use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
pub async fn download_file(
filename: &str,
remote_ext_bucket: String,
remote_ext_region: String,
remote_ext_endpoint: String,
) -> anyhow::Result<()> {
println!("requested file {}", filename);
let s3_config = create_s3_config(remote_ext_bucket, remote_ext_region, remote_ext_endpoint);
// TODO: get rid of this function by making s3_config part of ComputeNode
pub async fn download_file(filename: &str, remote_ext_config: &str) -> anyhow::Result<()> {
let s3_config = create_s3_config(remote_ext_config)?;
download_extension(&s3_config, ExtensionType::Shared).await?;
Ok(())
}
@@ -107,23 +103,33 @@ pub async fn download_extension(
Ok(())
}
// TODO: add support for more of these parameters being configurable?
pub fn create_s3_config(
remote_ext_bucket: String,
remote_ext_region: String,
remote_ext_endpoint: String,
) -> RemoteStorageConfig {
pub fn create_s3_config(remote_ext_config: &str) -> anyhow::Result<RemoteStorageConfig> {
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
let remote_ext_bucket = match &remote_ext_config["bucket"] {
Value::String(x) => x,
_ => panic!("oops"),
};
let remote_ext_region = match &remote_ext_config["region"] {
Value::String(x) => x,
_ => panic!("oops"),
};
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
Value::String(x) => Some(x.clone()),
_ => None,
};
// TODO: add support for more of these parameters being configurable?
let config = S3Config {
bucket_name: remote_ext_bucket,
bucket_region: remote_ext_region,
bucket_name: remote_ext_bucket.clone(),
bucket_region: remote_ext_region.clone(),
prefix_in_bucket: None,
endpoint: Some(remote_ext_endpoint),
endpoint: remote_ext_endpoint,
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
max_keys_per_list_response: None,
};
RemoteStorageConfig {
Ok(RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
storage: RemoteStorageKind::AwsS3(config),
}
})
}

View File

@@ -134,15 +134,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
filename
);
match extension_server::download_file(
filename,
// TODO: pass more remote_ext arguments?
compute.remote_ext_bucket.clone(),
compute.remote_ext_region.clone(),
compute.remote_ext_endpoint.clone(),
)
.await
{
match extension_server::download_file(filename, &compute.remote_ext_config).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("download_file failed: {}", e);

View File

@@ -1,11 +1,8 @@
from contextlib import closing
from typing import List
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
available_remote_storages,
)
import json