mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-25 22:30:38 +00:00
Compare commits
6 Commits
release-42
...
hack/compu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1bf5e07da1 | ||
|
|
87de91b004 | ||
|
|
2f217f9ebd | ||
|
|
71491dd467 | ||
|
|
67e791c4ec | ||
|
|
517782ab94 |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -3221,7 +3221,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -3234,7 +3234,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
@@ -3245,7 +3245,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -3263,7 +3263,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4933,7 +4933,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
12
Cargo.toml
12
Cargo.toml
@@ -165,11 +165,11 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
|
||||
## Other git libraries
|
||||
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
|
||||
@@ -206,7 +206,7 @@ tonic-build = "0.9"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -714,6 +714,23 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -
|
||||
cargo pgrx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-wait-sampling-pg-build"
|
||||
# compile pg_wait_sampling extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-wait-sampling-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/postgrespro/pg_wait_sampling/archive/refs/tags/v1.1.5.tar.gz -O pg_wait_sampling.tar.gz && \
|
||||
echo 'a03da6a413f5652ce470a3635ed6ebba528c74cb26aa4cfced8aff8a8441f81ec6dd657ff62cd6ce96a4e6ce02cad9f2519ae9525367ece60497aa20faafde5c pg_wait_sampling.tar.gz' | sha512sum -c && \
|
||||
mkdir pg_wait_sampling-src && cd pg_wait_sampling-src && tar xvzf ../pg_wait_sampling.tar.gz --strip-components=1 -C . && \
|
||||
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_wait_sampling.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
@@ -750,6 +767,7 @@ COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-wait-sampling-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -479,6 +479,13 @@ fn cli() -> clap::Command {
|
||||
)
|
||||
.value_name("FILECACHE_CONNSTR"),
|
||||
)
|
||||
.arg(
|
||||
// DEPRECATED, NO LONGER DOES ANYTHING.
|
||||
// See https://github.com/neondatabase/cloud/issues/7516
|
||||
Arg::new("file-cache-on-disk")
|
||||
.long("file-cache-on-disk")
|
||||
.action(clap::ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -638,7 +638,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
|
||||
///
|
||||
/// Operations:
|
||||
/// - open ([`std::fs::OpenOptions::open`])
|
||||
/// - close (dropping [`crate::virtual_file::VirtualFile`])
|
||||
/// - close (dropping [`std::fs::File`])
|
||||
/// - close-by-replace (close by replacement algorithm)
|
||||
/// - read (`read_at`)
|
||||
/// - write (`write_at`)
|
||||
|
||||
@@ -291,16 +291,6 @@ impl From<harness::TestRedoManager> for WalRedoManager {
|
||||
}
|
||||
|
||||
impl WalRedoManager {
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
// Not applicable to test redo manager
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn request_redo(
|
||||
&self,
|
||||
key: crate::repository::Key,
|
||||
@@ -1659,16 +1649,22 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
async fn compaction_iteration(
|
||||
pub async fn compaction_iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(), timeline::CompactionError> {
|
||||
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
|
||||
if !self.is_active() {
|
||||
) -> anyhow::Result<()> {
|
||||
// Don't start doing work during shutdown
|
||||
if let TenantState::Stopping { .. } = self.current_state() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We should only be called once the tenant has activated.
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot run compaction iteration on inactive tenant"
|
||||
);
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
|
||||
|
||||
@@ -289,9 +289,7 @@ impl DeltaLayer {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
let loaded = DeltaLayerInner::load(&path, None, ctx).await?;
|
||||
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_owned();
|
||||
@@ -612,28 +610,18 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
/// - outer has the permanent failure
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{path}'"))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
|
||||
// TODO: this should be an assertion instead; see ImageLayerInner::load
|
||||
let actual_summary =
|
||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// production code path
|
||||
@@ -648,11 +636,11 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Ok(DeltaLayerInner {
|
||||
Ok(DeltaLayerInner {
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
|
||||
@@ -249,9 +249,7 @@ impl ImageLayer {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx).await?;
|
||||
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_owned();
|
||||
@@ -297,31 +295,18 @@ impl ImageLayer {
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
/// - outer has the permanent failure
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
|
||||
// length is the only way how this could fail, so it's not actually likely at all unless
|
||||
// read_blk returns wrong sized block.
|
||||
//
|
||||
// TODO: confirm and make this into assertion
|
||||
let actual_summary =
|
||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// production code path
|
||||
@@ -337,12 +322,12 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Ok(ImageLayerInner {
|
||||
Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn,
|
||||
file,
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
|
||||
@@ -868,9 +868,6 @@ impl LayerInner {
|
||||
}
|
||||
Ok((Err(e), _permit)) => {
|
||||
// FIXME: this should be with the spawned task and be cancellation sensitive
|
||||
//
|
||||
// while we should not need this, this backoff has turned out to be useful with
|
||||
// a bug of unexpectedly deleted remote layer file (#5787).
|
||||
let consecutive_failures =
|
||||
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
@@ -1199,7 +1196,7 @@ impl DownloadedLayer {
|
||||
));
|
||||
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
.map(LayerKind::Delta)
|
||||
} else {
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
@@ -1210,32 +1207,23 @@ impl DownloadedLayer {
|
||||
));
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(Ok(layer)) => Ok(Ok(layer)),
|
||||
Ok(Err(transient)) => Err(transient),
|
||||
Err(permanent) => {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// TODO(#5815): we are not logging all errors, so temporarily log them **once**
|
||||
// here as well
|
||||
let permanent = permanent.context("load layer");
|
||||
tracing::error!("layer loading failed permanently: {permanent:#}");
|
||||
Ok(Err(permanent))
|
||||
}
|
||||
.map(LayerKind::Image)
|
||||
}
|
||||
// this will be a permanent failure
|
||||
.context("load layer");
|
||||
|
||||
if let Err(e) = res.as_ref() {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// TODO(#5815): we are not logging all errors, so temporarily log them here as well
|
||||
tracing::error!("layer loading failed permanently: {e:#}");
|
||||
}
|
||||
res
|
||||
};
|
||||
self.kind
|
||||
.get_or_try_init(init)
|
||||
// return transient errors using `?`
|
||||
.await?
|
||||
.as_ref()
|
||||
.map_err(|e| {
|
||||
// errors are not clonabled, cannot but stringify
|
||||
// test_broken_timeline matches this string
|
||||
anyhow::anyhow!("layer loading failed: {e:#}")
|
||||
})
|
||||
self.kind.get_or_init(init).await.as_ref().map_err(|e| {
|
||||
// errors are not clonabled, cannot but stringify
|
||||
// test_broken_timeline matches this string
|
||||
anyhow::anyhow!("layer loading failed: {e:#}")
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_value_reconstruct_data(
|
||||
|
||||
@@ -180,7 +180,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
error_run_count,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
@@ -198,10 +198,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
|
||||
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
|
||||
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
tenant.walredo_mgr.maybe_quiesce(period * 10);
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
@@ -265,7 +261,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
error_run_count,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
|
||||
@@ -91,7 +91,6 @@ struct ProcessOutput {
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
last_redo_at: std::sync::Mutex<Option<Instant>>,
|
||||
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
|
||||
}
|
||||
|
||||
@@ -188,26 +187,10 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// This type doesn't have its own background task to check for idleness: we
|
||||
/// rely on our owner calling this function periodically in its own housekeeping
|
||||
/// loops.
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
if let Ok(g) = self.last_redo_at.try_lock() {
|
||||
if let Some(last_redo_at) = *g {
|
||||
if last_redo_at.elapsed() >= idle_timeout {
|
||||
drop(g);
|
||||
let mut guard = self.redo_process.write().unwrap();
|
||||
*guard = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Process one request for WAL redo using wal-redo postgres
|
||||
///
|
||||
@@ -222,8 +205,6 @@ impl PostgresRedoManager {
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
|
||||
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let mut n_attempts = 0u32;
|
||||
@@ -367,13 +348,12 @@ impl PostgresRedoManager {
|
||||
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
|
||||
}
|
||||
// Success!
|
||||
let duration = start_time.elapsed();
|
||||
// FIXME: using the same metric here creates a bimodal distribution by default, and because
|
||||
// there could be multiple batch sizes this would be N+1 modal.
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(start_time);
|
||||
WAL_REDO_TIME.observe(duration.as_secs_f64());
|
||||
|
||||
debug!(
|
||||
"neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
|
||||
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
||||
records.len(),
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
|
||||
@@ -1687,9 +1687,9 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DISK_FULL),
|
||||
errmsg("could not extend file because project size limit (%d MB) has been exceeded",
|
||||
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
|
||||
max_cluster_size),
|
||||
errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC")));
|
||||
errhint("This limit is defined by neon.max_cluster_size GUC")));
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -248,7 +248,6 @@ impl ConnCfg {
|
||||
|
||||
// connect_raw() will not use TLS if sslmode is "disable"
|
||||
let (client, connection) = self.0.connect_raw(stream, tls).await?;
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
let stream = connection.stream.into_inner();
|
||||
|
||||
info!(
|
||||
|
||||
@@ -514,7 +514,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node once.
|
||||
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
|
||||
#[tracing::instrument(name = "connect_once", skip_all)]
|
||||
async fn connect_to_compute_once(
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
|
||||
@@ -208,10 +208,6 @@ impl GlobalConnPool {
|
||||
} else {
|
||||
info!("pool: reusing connection '{conn_info}'");
|
||||
client.session.send(session_id)?;
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
&tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
latency_timer.pool_hit();
|
||||
latency_timer.success();
|
||||
return Ok(Client::new(client, pool).await);
|
||||
@@ -228,12 +224,6 @@ impl GlobalConnPool {
|
||||
)
|
||||
.await
|
||||
};
|
||||
if let Ok(client) = &new_client {
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
&tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
}
|
||||
|
||||
match &new_client {
|
||||
// clear the hash. it's no longer valid
|
||||
@@ -267,11 +257,12 @@ impl GlobalConnPool {
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
let new_client = new_client?;
|
||||
Ok(Client::new(new_client, pool).await)
|
||||
|
||||
// new_client.map(|inner| Client::new(inner, pool).await)
|
||||
Ok(Client::new(new_client?, pool).await)
|
||||
}
|
||||
|
||||
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
|
||||
fn put(&self, conn_info: &ConnInfo, client: ClientInner, pid: i32) -> anyhow::Result<()> {
|
||||
let conn_id = client.conn_id;
|
||||
|
||||
// We want to hold this open while we return. This ensures that the pool can't close
|
||||
@@ -315,9 +306,9 @@ impl GlobalConnPool {
|
||||
|
||||
// do logging outside of the mutex
|
||||
if returned {
|
||||
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}, pid={pid}");
|
||||
} else {
|
||||
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
||||
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}, pid={pid}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -394,7 +385,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
|
||||
// Wake up the destination if needed. Code here is a bit involved because
|
||||
// we reuse the code from the usual proxy and we need to prepare few structures
|
||||
// that this code expects.
|
||||
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn connect_to_compute(
|
||||
config: &config::ProxyConfig,
|
||||
conn_info: &ConnInfo,
|
||||
@@ -461,7 +452,6 @@ async fn connect_to_compute_once(
|
||||
.connect_timeout(timeout)
|
||||
.connect(tokio_postgres::NoTls)
|
||||
.await?;
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
|
||||
let (tx, mut rx) = tokio::sync::watch::channel(session);
|
||||
|
||||
@@ -529,6 +519,22 @@ struct ClientInner {
|
||||
conn_id: uuid::Uuid,
|
||||
}
|
||||
|
||||
impl ClientInner {
|
||||
pub async fn get_pid(&mut self) -> anyhow::Result<i32> {
|
||||
let rows = self.inner.query("select pg_backend_pid();", &[]).await?;
|
||||
if rows.len() != 1 {
|
||||
Err(anyhow::anyhow!(
|
||||
"expected 1 row from pg_backend_pid(), got {}",
|
||||
rows.len()
|
||||
))
|
||||
} else {
|
||||
let pid = rows[0].get(0);
|
||||
info!(%pid, "got pid");
|
||||
Ok(pid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn metrics(&self) -> Arc<MetricCounter> {
|
||||
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
|
||||
@@ -540,6 +546,7 @@ pub struct Client {
|
||||
span: Span,
|
||||
inner: Option<ClientInner>,
|
||||
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
|
||||
pid: i32,
|
||||
}
|
||||
|
||||
pub struct Discard<'a> {
|
||||
@@ -549,11 +556,12 @@ pub struct Discard<'a> {
|
||||
|
||||
impl Client {
|
||||
pub(self) async fn new(
|
||||
inner: ClientInner,
|
||||
mut inner: ClientInner,
|
||||
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
|
||||
) -> Self {
|
||||
Self {
|
||||
conn_id: inner.conn_id,
|
||||
pid: inner.get_pid().await.unwrap_or(-1),
|
||||
inner: Some(inner),
|
||||
span: Span::current(),
|
||||
pool,
|
||||
@@ -565,6 +573,7 @@ impl Client {
|
||||
pool,
|
||||
conn_id,
|
||||
span: _,
|
||||
pid: _,
|
||||
} = self;
|
||||
(
|
||||
&mut inner
|
||||
@@ -621,10 +630,11 @@ impl Drop for Client {
|
||||
.expect("client inner should not be removed");
|
||||
if let Some((conn_info, conn_pool)) = self.pool.take() {
|
||||
let current_span = self.span.clone();
|
||||
let pid = self.pid;
|
||||
// return connection to the pool
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _span = current_span.enter();
|
||||
let _ = conn_pool.put(&conn_info, client);
|
||||
let _ = conn_pool.put(&conn_info, client, pid);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,7 +250,7 @@ pub async fn handle(
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
|
||||
#[instrument(name = "sql-over-http", skip_all)]
|
||||
async fn handle_inner(
|
||||
request: Request<Body>,
|
||||
sni_hostname: Option<String>,
|
||||
|
||||
@@ -4,15 +4,15 @@ commands:
|
||||
- name: cgconfigparser
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
|
||||
shell: "cgconfigparser -l /etc/cgconfig.conf -s 1664"
|
||||
- name: pgbouncer
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
|
||||
shell: "/usr/local/bin/pgbouncer /etc/pgbouncer.ini"
|
||||
- name: postgres-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter --extend.query-path /etc/postgres_exporter_queries.yml'
|
||||
shutdownHook: |
|
||||
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||
files:
|
||||
@@ -46,6 +46,21 @@ files:
|
||||
}
|
||||
memory {}
|
||||
}
|
||||
- filename: postgres_exporter_queries.yml
|
||||
content: |
|
||||
postgres_exporter_pg_database_size:
|
||||
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as bytes, 42 as fourtytwo FROM pg_database"
|
||||
cache_seconds: 30
|
||||
metrics:
|
||||
- datname:
|
||||
usage: "LABEL"
|
||||
description: "Name of the database"
|
||||
- bytes:
|
||||
usage: "GAUGE"
|
||||
description: "Disk space used by the database"
|
||||
- fourtytwo:
|
||||
usage: "GAUGE"
|
||||
description: "fourtytwo"
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
@@ -114,10 +129,12 @@ merge: |
|
||||
|
||||
COPY cgconfig.conf /etc/cgconfig.conf
|
||||
COPY pgbouncer.ini /etc/pgbouncer.ini
|
||||
COPY postgres_exporter_queries.yml /etc/postgres_exporter_queries.yml
|
||||
RUN set -e \
|
||||
&& chown postgres:postgres /etc/pgbouncer.ini \
|
||||
&& chmod 0644 /etc/pgbouncer.ini \
|
||||
&& chmod 0644 /etc/cgconfig.conf
|
||||
&& chmod 0644 /etc/cgconfig.conf \
|
||||
&& chmod 0644 /etc/postgres_exporter_queries.yml
|
||||
|
||||
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
|
||||
|
||||
Reference in New Issue
Block a user