feat: add checksum for checkpoint data (#3651)

* feat: add checksum for checkpoint data

Signed-off-by: tison <wander4096@gmail.com>

* add test

Signed-off-by: tison <wander4096@gmail.com>

* clippy

Signed-off-by: tison <wander4096@gmail.com>

* fix: checksum should calculate on uncompressed data

Signed-off-by: tison <wander4096@gmail.com>

* address comments

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
tison
2024-04-09 16:32:24 +08:00
committed by GitHub
parent 2398918adf
commit 2c0c7759ee
5 changed files with 176 additions and 30 deletions

26
Cargo.lock generated
View File

@@ -2285,9 +2285,9 @@ dependencies = [
[[package]]
name = "crc32fast"
version = "1.3.2"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa"
dependencies = [
"cfg-if 1.0.0",
]
@@ -3777,9 +3777,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.24"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9"
checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8"
dependencies = [
"bytes",
"fnv",
@@ -3970,9 +3970,9 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.12"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1"
checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb"
dependencies = [
"bytes",
"fnv",
@@ -5306,6 +5306,7 @@ dependencies = [
"common-test-util",
"common-time",
"common-wal",
"crc32fast",
"criterion",
"datafusion",
"datafusion-common",
@@ -11424,6 +11425,12 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasite"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.89"
@@ -11579,11 +11586,12 @@ dependencies = [
[[package]]
name = "whoami"
version = "1.4.1"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50"
checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9"
dependencies = [
"wasm-bindgen",
"redox_syscall 0.4.1",
"wasite",
"web-sys",
]

View File

@@ -31,6 +31,7 @@ common-telemetry.workspace = true
common-test-util = { workspace = true, optional = true }
common-time.workspace = true
common-wal.workspace = true
crc32fast = "1"
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true

View File

@@ -575,6 +575,9 @@ pub enum Error {
#[snafu(display("Invalid region options, {}", reason))]
InvalidRegionOptions { reason: String, location: Location },
#[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))]
ChecksumMismatch { actual: u32, expected: u32 },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -680,6 +683,7 @@ impl ErrorExt for Error {
Upload { .. } => StatusCode::StorageUnavailable,
BiError { .. } => StatusCode::Internal,
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
ChecksumMismatch { .. } => StatusCode::Unexpected,
}
}

View File

@@ -18,6 +18,7 @@ use std::str::FromStr;
use common_datasource::compression::CompressionType;
use common_telemetry::debug;
use crc32fast::Hasher;
use futures::future::try_join_all;
use futures::TryStreamExt;
use lazy_static::lazy_static;
@@ -29,8 +30,8 @@ use store_api::manifest::ManifestVersion;
use tokio::sync::Semaphore;
use crate::error::{
CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result,
SerdeJsonSnafu, Utf8Snafu,
ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
};
lazy_static! {
@@ -46,7 +47,6 @@ const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
const FETCH_MANIFEST_PARALLELISM: usize = 16;
/// Returns the [CompressionType] according to whether to compress manifest files.
#[inline]
pub const fn manifest_compress_type(compress: bool) -> CompressionType {
if compress {
DEFAULT_MANIFEST_COMPRESSION_TYPE
@@ -55,17 +55,14 @@ pub const fn manifest_compress_type(compress: bool) -> CompressionType {
}
}
#[inline]
pub fn delta_file(version: ManifestVersion) -> String {
format!("{version:020}.json")
}
#[inline]
pub fn checkpoint_file(version: ManifestVersion) -> String {
format!("{version:020}.checkpoint")
}
#[inline]
pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
if compress_type == CompressionType::Uncompressed {
format!("{}{}", path, file)
@@ -74,11 +71,30 @@ pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> Strin
}
}
fn checkpoint_checksum(data: &[u8]) -> u32 {
let mut hasher = Hasher::new();
hasher.update(data);
hasher.finalize()
}
fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
if let Some(checksum) = wanted {
let calculated_checksum = checkpoint_checksum(data);
ensure!(
checksum == calculated_checksum,
ChecksumMismatchSnafu {
actual: calculated_checksum,
expected: checksum,
}
);
}
Ok(())
}
/// Return's the file manifest version from path
///
/// # Panics
/// Panics if the file path is not a valid delta or checkpoint file.
#[inline]
/// If the file path is not a valid delta or checkpoint file.
pub fn file_version(path: &str) -> ManifestVersion {
let s = path.split('.').next().unwrap();
s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
@@ -88,18 +104,15 @@ pub fn file_version(path: &str) -> ManifestVersion {
///
/// for example file
/// `00000000000000000000.json.gz` -> `CompressionType::GZIP`
#[inline]
pub fn file_compress_type(path: &str) -> CompressionType {
let s = path.rsplit('.').next().unwrap_or("");
CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
}
#[inline]
pub fn is_delta_file(file_name: &str) -> bool {
DELTA_RE.is_match(file_name)
}
#[inline]
pub fn is_checkpoint_file(file_name: &str) -> bool {
CHECKPOINT_RE.is_match(file_name)
}
@@ -368,6 +381,7 @@ impl ManifestObjectStore {
path: &path,
})?;
let checkpoint_size = data.len();
let checksum = checkpoint_checksum(bytes);
self.object_store
.write(&path, data)
.await
@@ -380,7 +394,7 @@ impl ManifestObjectStore {
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
version,
checksum: None,
checksum: Some(checksum),
extend_metadata: HashMap::new(),
};
@@ -398,10 +412,11 @@ impl ManifestObjectStore {
Ok(())
}
pub async fn load_checkpoint(
async fn load_checkpoint(
&mut self,
version: ManifestVersion,
metadata: CheckpointMetadata,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let version = metadata.version;
let path = self.checkpoint_file_path(version);
// Due to backward compatibility, it is possible that the user's checkpoint not compressed,
// so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
@@ -415,6 +430,7 @@ impl ManifestObjectStore {
path,
},
)?;
verify_checksum(&decompress_data, metadata.checksum)?;
// set the checkpoint size
self.set_checkpoint_file_size(version, checkpoint_size as u64);
Ok(Some(decompress_data))
@@ -441,6 +457,7 @@ impl ManifestObjectStore {
compress_type: FALL_BACK_COMPRESS_TYPE,
path,
})?;
verify_checksum(&decompress_data, metadata.checksum)?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
Ok(Some(decompress_data))
}
@@ -479,7 +496,7 @@ impl ManifestObjectStore {
last_checkpoint_path, checkpoint_metadata
);
self.load_checkpoint(checkpoint_metadata.version).await
self.load_checkpoint(checkpoint_metadata).await
}
#[cfg(test)]
@@ -487,6 +504,54 @@ impl ManifestObjectStore {
self.object_store.read(path).await.context(OpenDalSnafu)
}
#[cfg(test)]
pub async fn write_last_checkpoint(
&mut self,
version: ManifestVersion,
bytes: &[u8],
) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
.encode(bytes)
.await
.context(CompressObjectSnafu {
compress_type: self.compress_type,
path: &path,
})?;
let checkpoint_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
let last_checkpoint_path = self.last_checkpoint_path();
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
version,
checksum: Some(1218259706),
extend_metadata: HashMap::new(),
};
debug!(
"Rewrite checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path, checkpoint_metadata
);
let bytes = checkpoint_metadata.encode()?;
// Overwrite the last checkpoint with the modified content
self.object_store
.write(&last_checkpoint_path, bytes.clone())
.await
.context(OpenDalSnafu)?;
Ok(())
}
/// Compute the size(Byte) in manifest size map.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.manifest_size_map.values().sum()
@@ -509,7 +574,7 @@ struct CheckpointMetadata {
pub size: usize,
/// The latest version this checkpoint contains.
pub version: ManifestVersion,
pub checksum: Option<String>,
pub checksum: Option<u32>,
pub extend_metadata: HashMap<String, String>,
}
@@ -544,6 +609,15 @@ mod tests {
ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed)
}
fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
CheckpointMetadata {
size: 0,
version,
checksum: None,
extend_metadata: Default::default(),
}
}
#[test]
// Define this test mainly to prevent future unintentional changes may break the backward compatibility.
fn test_compress_file_path_generation() {
@@ -608,7 +682,11 @@ mod tests {
//delete (,4) logs and keep checkpoint 3.
let _ = log_store.delete_until(4, true).await.unwrap();
let _ = log_store.load_checkpoint(3).await.unwrap().unwrap();
let _ = log_store
.load_checkpoint(new_checkpoint_metadata_with_version(3))
.await
.unwrap()
.unwrap();
let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
@@ -621,7 +699,11 @@ mod tests {
// delete all logs and checkpoints
let _ = log_store.delete_until(11, false).await.unwrap();
assert!(log_store.load_checkpoint(3).await.unwrap().is_none());
assert!(log_store
.load_checkpoint(new_checkpoint_metadata_with_version(3))
.await
.unwrap()
.is_none());
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
@@ -656,7 +738,7 @@ mod tests {
assert_eq!(v, 5);
assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
// write compressed data to stimulate compress alogorithom take effect
// write compressed data to stimulate compress algorithm take effect
for v in 5..10 {
log_store
.save(v, format!("hello, {v}").as_bytes())
@@ -678,7 +760,11 @@ mod tests {
assert_eq!(v, version);
assert_eq!(format!("hello, {v}").as_bytes(), bytes);
}
let (v, checkpoint) = log_store.load_checkpoint(5).await.unwrap().unwrap();
let (v, checkpoint) = log_store
.load_checkpoint(new_checkpoint_metadata_with_version(5))
.await
.unwrap()
.unwrap();
assert_eq!(v, 5);
assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();

View File

@@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use store_api::storage::RegionId;
use strum::IntoEnumIterator;
use crate::error::Error::ChecksumMismatch;
use crate::manifest::action::{
RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
@@ -150,7 +152,8 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":846,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
let expected_json =
"{\"size\":846,\"version\":9,\"checksum\":1218259706,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
// reopen the manager
@@ -159,6 +162,50 @@ async fn manager_with_checkpoint_distance_1() {
assert_eq!(10, manager.manifest().await.manifest_version);
}
#[tokio::test]
async fn test_corrupted_data_causing_checksum_error() {
// Initialize manager
common_telemetry::init_default_ut_logging();
let (_env, manager) = build_manager(1, CompressionType::Uncompressed).await;
// Apply actions
for _ in 0..10 {
manager.update(nop_action()).await.unwrap();
}
// Check if there is a checkpoint
assert!(manager
.store()
.await
.load_last_checkpoint()
.await
.unwrap()
.is_some());
// Corrupt the last checkpoint data
let mut corrupted_bytes = manager
.store()
.await
.read_file(&manager.store().await.last_checkpoint_path())
.await
.unwrap();
corrupted_bytes[0] ^= 1;
// Overwrite the latest checkpoint data
manager
.store()
.await
.write_last_checkpoint(9, &corrupted_bytes)
.await
.unwrap();
// Attempt to load the corrupted checkpoint
let load_corrupted_result = manager.store().await.load_last_checkpoint().await;
// Check if the result is an error and if it's of type VerifyChecksum
assert_matches!(load_corrupted_result, Err(ChecksumMismatch { .. }));
}
#[tokio::test]
async fn checkpoint_with_different_compression_types() {
common_telemetry::init_default_ut_logging();