fix: force-refresh azure identity token (#10378)

## Problem

Because of https://github.com/Azure/azure-sdk-for-rust/issues/1739, our
identity token file was not being refreshed. This caused our uploads to
start failing when the storage token expired.

## Summary of changes

Drop and recreate the remote storage config every time we upload in
order to force reload the identity token file.
This commit is contained in:
Conrad Ludgate
2025-01-14 12:53:32 +00:00
committed by GitHub
parent a039f8381f
commit df4abd8b14

View File

@@ -187,10 +187,6 @@ pub async fn worker(
let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
let rx = rx.map(RequestData::from);
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?;
let properties = WriterProperties::builder()
.set_data_page_size_limit(config.parquet_upload_page_size)
.set_compression(config.parquet_upload_compression);
@@ -224,18 +220,18 @@ pub async fn worker(
let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
let rx_disconnect = rx_disconnect.map(RequestData::from);
let storage_disconnect =
GenericRemoteStorage::from_config(&disconnect_events_storage_config)
.await
.context("remote storage for disconnect events init")?;
let parquet_config_disconnect = parquet_config.clone();
tokio::try_join!(
worker_inner(storage, rx, parquet_config),
worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
worker_inner(remote_storage_config, rx, parquet_config),
worker_inner(
disconnect_events_storage_config,
rx_disconnect,
parquet_config_disconnect
)
)
.map(|_| ())
} else {
worker_inner(storage, rx, parquet_config).await
worker_inner(remote_storage_config, rx, parquet_config).await
}
}
@@ -251,18 +247,32 @@ struct ParquetConfig {
test_remote_failures: u64,
}
impl ParquetConfig {
async fn storage(
&self,
storage_config: &RemoteStorageConfig,
) -> anyhow::Result<GenericRemoteStorage> {
let storage = GenericRemoteStorage::from_config(storage_config)
.await
.context("remote storage init")?;
#[cfg(any(test, feature = "testing"))]
if self.test_remote_failures > 0 {
return Ok(GenericRemoteStorage::unreliable_wrapper(
storage,
self.test_remote_failures,
));
}
Ok(storage)
}
}
async fn worker_inner(
storage: GenericRemoteStorage,
storage_config: RemoteStorageConfig,
rx: impl Stream<Item = RequestData>,
config: ParquetConfig,
) -> anyhow::Result<()> {
#[cfg(any(test, feature = "testing"))]
let storage = if config.test_remote_failures > 0 {
GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
} else {
storage
};
let mut rx = std::pin::pin!(rx);
let mut rows = Vec::with_capacity(config.rows_per_group);
@@ -285,7 +295,7 @@ async fn worker_inner(
}
if len > config.file_size || force {
last_upload = time::Instant::now();
let file = upload_parquet(w, len, &storage).await?;
let file = upload_parquet(w, len, &storage_config, &config).await?;
w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
len = 0;
}
@@ -298,7 +308,7 @@ async fn worker_inner(
}
if !w.flushed_row_groups().is_empty() {
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage_config, &config).await?;
}
Ok(())
@@ -340,7 +350,8 @@ where
async fn upload_parquet(
mut w: SerializedFileWriter<Writer<BytesMut>>,
len: i64,
storage: &GenericRemoteStorage,
storage_config: &RemoteStorageConfig,
config: &ParquetConfig,
) -> anyhow::Result<Writer<BytesMut>> {
let len_uncompressed = w
.flushed_row_groups()
@@ -377,6 +388,15 @@ async fn upload_parquet(
size, compression, "uploading request parquet file"
);
// A bug in azure-sdk means that the identity-token-file that expires after
// 1 hour is not refreshed. This identity-token is used to fetch the actual azure storage
// tokens that last for 24 hours. After this 24 hour period, azure-sdk tries to refresh
// the storage token, but the identity token has now expired.
// <https://github.com/Azure/azure-sdk-for-rust/issues/1739>
//
// To work around this, we recreate the storage every time.
let storage = config.storage(storage_config).await?;
let year = now.year();
let month = now.month();
let day = now.day();
@@ -431,8 +451,8 @@ mod tests {
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use remote_storage::{
GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
RemoteStorageConfig, RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
use tokio::sync::mpsc;
use tokio::time;
@@ -559,12 +579,11 @@ mod tests {
timeout: std::time::Duration::from_secs(120),
small_timeout: std::time::Duration::from_secs(30),
};
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
worker_inner(remote_storage_config, rx, config)
.await
.unwrap();
worker_inner(storage, rx, config).await.unwrap();
let mut files = WalkDir::new(tmpdir.as_std_path())
.into_iter()
.filter_map(|entry| entry.ok())