mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
Merge pull request #3679 from neondatabase/releases/2023-02-22
Releases/2023-02-22
This commit is contained in:
@@ -32,8 +32,6 @@ storage:
|
||||
hosts:
|
||||
safekeeper-0.ap-southeast-1.aws.neon.tech:
|
||||
ansible_host: i-0d6f1dc5161eef894
|
||||
safekeeper-1.ap-southeast-1.aws.neon.tech:
|
||||
ansible_host: i-0e338adda8eb2d19f
|
||||
safekeeper-2.ap-southeast-1.aws.neon.tech:
|
||||
ansible_host: i-04fb63634e4679eb9
|
||||
safekeeper-3.ap-southeast-1.aws.neon.tech:
|
||||
|
||||
2
.github/workflows/deploy-dev.yml
vendored
2
.github/workflows/deploy-dev.yml
vendored
@@ -67,7 +67,7 @@ jobs:
|
||||
./get_binaries.sh
|
||||
|
||||
ansible-galaxy collection install sivel.toiletwater
|
||||
ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
- name: Cleanup ansible folder
|
||||
|
||||
2
.github/workflows/deploy-prod.yml
vendored
2
.github/workflows/deploy-prod.yml
vendored
@@ -68,7 +68,7 @@ jobs:
|
||||
./get_binaries.sh
|
||||
|
||||
ansible-galaxy collection install sivel.toiletwater
|
||||
ansible-playbook deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }}
|
||||
rm -f neon_install.tar.gz .neon_current_version
|
||||
|
||||
deploy-proxy-prod-new:
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3054,6 +3054,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
|
||||
@@ -21,7 +21,7 @@ toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
metrics.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
pin-project-lite.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -20,7 +20,10 @@ use aws_sdk_s3::{
|
||||
};
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use hyper::Body;
|
||||
use tokio::{io, sync::Semaphore};
|
||||
use tokio::{
|
||||
io::{self, AsyncRead},
|
||||
sync::Semaphore,
|
||||
};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -102,7 +105,7 @@ pub struct S3Bucket {
|
||||
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
|
||||
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
|
||||
// The helps to ensure we don't exceed the thresholds.
|
||||
concurrency_limiter: Semaphore,
|
||||
concurrency_limiter: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -162,7 +165,7 @@ impl S3Bucket {
|
||||
client,
|
||||
bucket_name: aws_config.bucket_name.clone(),
|
||||
prefix_in_bucket,
|
||||
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
|
||||
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -194,9 +197,10 @@ impl S3Bucket {
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
let _guard = self
|
||||
let permit = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")
|
||||
.map_err(DownloadError::Other)?;
|
||||
@@ -217,9 +221,10 @@ impl S3Bucket {
|
||||
let metadata = object_output.metadata().cloned().map(StorageMetadata);
|
||||
Ok(Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(io::BufReader::new(
|
||||
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
|
||||
permit,
|
||||
object_output.body.into_async_read(),
|
||||
)),
|
||||
))),
|
||||
})
|
||||
}
|
||||
Err(SdkError::ServiceError {
|
||||
@@ -240,6 +245,32 @@ impl S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
|
||||
struct RatelimitedAsyncRead<S> {
|
||||
permit: tokio::sync::OwnedSemaphorePermit,
|
||||
#[pin]
|
||||
inner: S,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> RatelimitedAsyncRead<S> {
|
||||
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
|
||||
RatelimitedAsyncRead { permit, inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let this = self.project();
|
||||
this.inner.poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RemoteStorage for S3Bucket {
|
||||
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
|
||||
|
||||
@@ -74,7 +74,7 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
let period = tenant.get_compaction_period();
|
||||
|
||||
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
|
||||
// loop
|
||||
// loop, #3501. There are also additional "allowed_errors" in tests.
|
||||
if first {
|
||||
first = false;
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
|
||||
@@ -1770,15 +1770,9 @@ impl Timeline {
|
||||
let calculation = async {
|
||||
let cancel = cancel.child_token();
|
||||
let ctx = ctx.attached_child();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// Run in a separate thread since this can do a lot of
|
||||
// synchronous file IO without .await inbetween
|
||||
// if there are no RemoteLayers that would require downloading.
|
||||
let h = tokio::runtime::Handle::current();
|
||||
h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel, &ctx))
|
||||
})
|
||||
.await
|
||||
.context("Failed to spawn calculation result task")?
|
||||
self_calculation
|
||||
.calculate_logical_size(init_lsn, cancel, &ctx)
|
||||
.await
|
||||
};
|
||||
let timeline_state_cancellation = async {
|
||||
loop {
|
||||
@@ -1811,7 +1805,7 @@ impl Timeline {
|
||||
tokio::pin!(calculation);
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = &mut calculation => { return res }
|
||||
res = &mut calculation => { return res }
|
||||
reason = timeline_state_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
@@ -3745,6 +3739,7 @@ impl Timeline {
|
||||
remote_layer.ongoing_download.close();
|
||||
} else {
|
||||
// Keep semaphore open. We'll drop the permit at the end of the function.
|
||||
info!("on-demand download failed: {:?}", result.as_ref().unwrap_err());
|
||||
}
|
||||
|
||||
// Don't treat it as an error if the task that triggered the download
|
||||
|
||||
@@ -2080,6 +2080,8 @@ class NeonPageserver(PgProtocol):
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# this is until #3501
|
||||
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
|
||||
]
|
||||
|
||||
def start(
|
||||
|
||||
Reference in New Issue
Block a user