feat(mito2): expose puffin index metadata (#7042)

* Add encode/decode helpers for IndexTarget

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Use IndexTarget encode for puffin index blob keys

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Normalize puffin index blobs to use IndexTarget keys

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(mito2): expose puffin index metadata

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* target json polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix header

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add index path

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address copilot comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* reuse cached index metadata

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* parallelism for reading index meta

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-10-17 14:22:07 +08:00
committed by GitHub
parent 21532abf94
commit 7b396bb290
19 changed files with 1206 additions and 30 deletions

2
Cargo.lock generated
View File

@@ -6125,6 +6125,7 @@ dependencies = [
"serde",
"serde_json",
"snafu 0.8.6",
"store-api",
"tantivy",
"tantivy-jieba",
"tempfile",
@@ -7632,6 +7633,7 @@ dependencies = [
"dotenv",
"either",
"futures",
"greptime-proto",
"humantime-serde",
"index",
"itertools 0.14.0",

View File

@@ -34,6 +34,7 @@ roaring = "0.10"
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
tantivy = { version = "0.24", features = ["zstd-compression"] }
tantivy-jieba = "0.16"
tokio.workspace = true

View File

@@ -75,3 +75,12 @@ impl Config {
Ok(Self::default())
}
}
impl Analyzer {
pub fn to_str(&self) -> &'static str {
match self {
Analyzer::English => "English",
Analyzer::Chinese => "Chinese",
}
}
}

View File

@@ -21,6 +21,7 @@ pub mod error;
pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub mod target;
pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];

107
src/index/src/target.rs Normal file
View File

@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::fmt::{self, Display};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use serde::{Deserialize, Serialize};
use snafu::{Snafu, ensure};
use store_api::storage::ColumnId;
/// Describes an index target. Column ids are the only supported variant for now.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexTarget {
ColumnId(ColumnId),
}
impl Display for IndexTarget {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IndexTarget::ColumnId(id) => write!(f, "{}", id),
}
}
}
impl IndexTarget {
/// Parse a target key string back into an index target description.
pub fn decode(key: &str) -> Result<Self, TargetKeyError> {
validate_column_key(key)?;
let id = key
.parse::<ColumnId>()
.map_err(|_| InvalidColumnIdSnafu { value: key }.build())?;
Ok(IndexTarget::ColumnId(id))
}
}
/// Errors that can occur when working with index target keys.
#[derive(Snafu, Clone, PartialEq, Eq)]
#[stack_trace_debug]
pub enum TargetKeyError {
#[snafu(display("target key cannot be empty"))]
Empty,
#[snafu(display("target key must contain digits only: {key}"))]
InvalidCharacters { key: String },
#[snafu(display("failed to parse column id from '{value}'"))]
InvalidColumnId { value: String },
}
impl ErrorExt for TargetKeyError {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
}
fn as_any(&self) -> &dyn Any {
self
}
}
fn validate_column_key(key: &str) -> Result<(), TargetKeyError> {
ensure!(!key.is_empty(), EmptySnafu);
ensure!(
key.chars().all(|ch| ch.is_ascii_digit()),
InvalidCharactersSnafu { key }
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encode_decode_column() {
let target = IndexTarget::ColumnId(42);
let key = format!("{}", target);
assert_eq!(key, "42");
let decoded = IndexTarget::decode(&key).unwrap();
assert_eq!(decoded, target);
}
#[test]
fn decode_rejects_empty() {
let err = IndexTarget::decode("").unwrap_err();
assert!(matches!(err, TargetKeyError::Empty));
}
#[test]
fn decode_rejects_invalid_digits() {
let err = IndexTarget::decode("1a2").unwrap_err();
assert!(matches!(err, TargetKeyError::InvalidCharacters { .. }));
}
}

View File

@@ -50,6 +50,7 @@ futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
itertools.workspace = true
greptime-proto.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
mito-codec.workspace = true

View File

@@ -67,6 +67,8 @@ mod sync_test;
#[cfg(test)]
mod truncate_test;
mod puffin_index;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
@@ -78,7 +80,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{info, tracing};
use common_telemetry::{info, tracing, warn};
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use futures::future::{join_all, try_join_all};
use futures::stream::{self, Stream, StreamExt};
@@ -97,12 +99,14 @@ use store_api::region_engine::{
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{Semaphore, oneshot};
use crate::access_layer::RegionFilePathFactory;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
@@ -117,7 +121,7 @@ use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::region::opener::PartitionExprFetcherRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::FileMeta;
use crate::sst::file::{FileMeta, RegionFileId};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::wal::entry_distributor::{
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
@@ -434,6 +438,89 @@ impl MitoEngine {
results
}
/// Lists metadata about all puffin index targets stored in the engine.
pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
let node_id = self.inner.workers.file_ref_manager().node_id();
let cache_manager = self.inner.workers.cache_manager();
let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
let mut results = Vec::new();
for region in self.inner.workers.all_regions() {
let manifest_entries = region.manifest_sst_entries().await;
let access_layer = region.access_layer.clone();
let table_dir = access_layer.table_dir().to_string();
let path_type = access_layer.path_type();
let object_store = access_layer.object_store().clone();
let puffin_factory = access_layer.puffin_manager_factory().clone();
let path_factory = RegionFilePathFactory::new(table_dir, path_type);
let entry_futures = manifest_entries.into_iter().map(|entry| {
let object_store = object_store.clone();
let path_factory = path_factory.clone();
let puffin_factory = puffin_factory.clone();
let puffin_metadata_cache = puffin_metadata_cache.clone();
let bloom_filter_cache = bloom_filter_cache.clone();
let inverted_index_cache = inverted_index_cache.clone();
async move {
let Some(index_file_path) = entry.index_file_path.as_ref() else {
return Vec::new();
};
let file_id = match FileId::parse_str(&entry.file_id) {
Ok(file_id) => file_id,
Err(err) => {
warn!(
err;
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
entry.table_dir,
entry.file_id
);
return Vec::new();
}
};
let region_file_id = RegionFileId::new(entry.region_id, file_id);
let context = IndexEntryContext {
table_dir: &entry.table_dir,
index_file_path: index_file_path.as_str(),
region_id: entry.region_id,
table_id: entry.table_id,
region_number: entry.region_number,
region_group: entry.region_group,
region_sequence: entry.region_sequence,
file_id: &entry.file_id,
index_file_size: entry.index_file_size,
node_id,
};
let manager = puffin_factory
.build(object_store, path_factory)
.with_puffin_metadata_cache(puffin_metadata_cache);
collect_index_entries_from_puffin(
manager,
region_file_id,
context,
bloom_filter_cache,
inverted_index_cache,
)
.await
}
});
let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
while let Some(mut metas) = meta_stream.next().await {
results.append(&mut metas);
}
}
results
}
/// Lists all SSTs from the storage layer of all regions in the engine.
pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
let node_id = self.inner.workers.file_ref_manager().node_id();

View File

@@ -819,3 +819,150 @@ StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size:
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#
);
}
#[tokio::test]
async fn test_all_index_metas_list_all_types() {
use datatypes::schema::{
FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
};
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
// One region with both fulltext backends and inverted index enabled, plus bloom skipping index
let region_id = RegionId::new(11, 1);
let mut request = CreateRequestBuilder::new().tag_num(3).field_num(2).build();
// inverted index on tag_0
request.column_metadatas[0]
.column_schema
.set_inverted_index(true);
// fulltext bloom on tag_1
let ft_bloom = FulltextOptions::new_unchecked(
true,
FulltextAnalyzer::English,
false,
FulltextBackend::Bloom,
4,
0.001,
);
request.column_metadatas[1]
.column_schema
.set_fulltext_options(&ft_bloom)
.unwrap();
// fulltext tantivy on tag_2
let ft_tantivy = FulltextOptions::new_unchecked(
true,
FulltextAnalyzer::Chinese,
true,
FulltextBackend::Tantivy,
2,
0.01,
);
request.column_metadatas[2]
.column_schema
.set_fulltext_options(&ft_tantivy)
.unwrap();
// bloom filter skipping index on field_1 (which is at index 3)
let skipping = SkippingIndexOptions::new_unchecked(2, 0.01, SkippingIndexType::BloomFilter);
request.column_metadatas[3]
.column_schema
.set_skipping_options(&skipping)
.unwrap();
// inverted index on field_1
request.column_metadatas[4]
.column_schema
.set_inverted_index(true);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
// write some rows (schema: tag_0, tag_1, tag_2, field_0, field_1, ts)
let column_schemas = rows_schema(&request);
let rows_vec: Vec<api::v1::Row> = (0..20)
.map(|ts| api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue("x".to_string())),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue("y".to_string())),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue("z".to_string())),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::F64Value(ts as f64)),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::F64Value((20 - ts) as f64)),
},
api::v1::Value {
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
ts as i64 * 1000,
)),
},
],
})
.collect();
let rows = api::v1::Rows {
schema: column_schemas.clone(),
rows: rows_vec,
};
put_rows(&engine, region_id, rows).await;
// flush to generate sst and indexes
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
fn bucket_size(size: u64) -> u64 {
if size < 512 { size } else { (size / 16) * 16 }
}
let mut metas = engine.all_index_metas().await;
for entry in &mut metas {
entry.index_file_path = entry.index_file_path.replace(&entry.file_id, "<file_id>");
entry.file_id = "<file_id>".to_string();
entry.index_file_size = entry.index_file_size.map(bucket_size);
if entry.index_type == "fulltext_tantivy" {
entry.blob_size = bucket_size(entry.blob_size);
}
if let Some(meta_json) = entry.meta_json.as_mut()
&& let Ok(mut value) = serde_json::from_str::<serde_json::Value>(meta_json)
{
if let Some(inverted) = value.get_mut("inverted").and_then(|v| v.as_object_mut()) {
inverted.insert("base_offset".to_string(), serde_json::Value::from(0));
}
*meta_json = value.to_string();
}
}
metas.sort_by(|a, b| {
(a.index_type.as_str(), a.target_key.as_str())
.cmp(&(b.index_type.as_str(), b.target_key.as_str()))
});
let debug_format = metas
.iter()
.map(|entry| format!("\n{:?}", entry))
.collect::<String>();
assert_eq!(
debug_format,
r#"
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 87, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 70, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":44,\"inverted_index_size\":70,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#
);
}

View File

@@ -0,0 +1,502 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::convert::TryFrom;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use greptime_proto::v1::index::{BloomFilterMeta, InvertedIndexMeta, InvertedIndexMetas};
use index::bitmap::BitmapType;
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use index::fulltext_index::Config as FulltextConfig;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use index::target::IndexTarget;
use puffin::blob_metadata::BlobMetadata;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use serde_json::{Map, Value, json};
use store_api::sst_entry::PuffinIndexMetaEntry;
use store_api::storage::{ColumnId, RegionGroup, RegionId, RegionNumber, RegionSeq, TableId};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
};
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::sst::file::RegionFileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
use crate::sst::index::fulltext_index::{
INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
INDEX_BLOB_TYPE_TANTIVY as FULLTEXT_TANTIVY_BLOB_TYPE,
};
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE as INVERTED_BLOB_TYPE;
use crate::sst::index::puffin_manager::{SstPuffinManager, SstPuffinReader};
const INDEX_TYPE_BLOOM: &str = "bloom_filter";
const INDEX_TYPE_FULLTEXT_BLOOM: &str = "fulltext_bloom";
const INDEX_TYPE_FULLTEXT_TANTIVY: &str = "fulltext_tantivy";
const INDEX_TYPE_INVERTED: &str = "inverted";
const TARGET_TYPE_UNKNOWN: &str = "unknown";
const TARGET_TYPE_COLUMN: &str = "column";
pub(crate) struct IndexEntryContext<'a> {
pub(crate) table_dir: &'a str,
pub(crate) index_file_path: &'a str,
pub(crate) region_id: RegionId,
pub(crate) table_id: TableId,
pub(crate) region_number: RegionNumber,
pub(crate) region_group: RegionGroup,
pub(crate) region_sequence: RegionSeq,
pub(crate) file_id: &'a str,
pub(crate) index_file_size: Option<u64>,
pub(crate) node_id: Option<u64>,
}
/// Collect index metadata entries present in the SST puffin file.
pub(crate) async fn collect_index_entries_from_puffin(
manager: SstPuffinManager,
region_file_id: RegionFileId,
context: IndexEntryContext<'_>,
bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
inverted_index_cache: Option<InvertedIndexCacheRef>,
) -> Vec<PuffinIndexMetaEntry> {
let mut entries = Vec::new();
let reader = match manager.reader(&region_file_id).await {
Ok(reader) => reader,
Err(err) => {
warn!(
err;
"Failed to open puffin index file, table_dir: {}, file_id: {}",
context.table_dir,
context.file_id
);
return entries;
}
};
let file_metadata = match reader.metadata().await {
Ok(metadata) => metadata,
Err(err) => {
warn!(
err;
"Failed to read puffin file metadata, table_dir: {}, file_id: {}",
context.table_dir,
context.file_id
);
return entries;
}
};
for blob in &file_metadata.blobs {
match BlobIndexTypeTargetKey::from_blob_type(&blob.blob_type) {
Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
Tag::Skipping,
&context,
)
.await;
let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
let (target_type, target_json) = decode_target_info(target_key);
let meta_json = build_meta_json(bloom_value, None, None);
let entry = build_index_entry(
&context,
INDEX_TYPE_BLOOM,
target_type,
target_key.to_string(),
target_json,
blob.length as u64,
meta_json,
);
entries.push(entry);
}
Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
Tag::Fulltext,
&context,
)
.await;
let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
let fulltext_value = Some(fulltext_meta_value(blob));
let (target_type, target_json) = decode_target_info(target_key);
let meta_json = build_meta_json(bloom_value, fulltext_value, None);
let entry = build_index_entry(
&context,
INDEX_TYPE_FULLTEXT_BLOOM,
target_type,
target_key.to_string(),
target_json,
blob.length as u64,
meta_json,
);
entries.push(entry);
}
Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key)) => {
let fulltext_value = Some(fulltext_meta_value(blob));
let (target_type, target_json) = decode_target_info(target_key);
let meta_json = build_meta_json(None, fulltext_value, None);
let entry = build_index_entry(
&context,
INDEX_TYPE_FULLTEXT_TANTIVY,
target_type,
target_key.to_string(),
target_json,
blob.length as u64,
meta_json,
);
entries.push(entry);
}
Some(BlobIndexTypeTargetKey::Inverted) => {
let mut inverted_entries = collect_inverted_entries(
&reader,
region_file_id,
inverted_index_cache.as_ref(),
&context,
)
.await;
entries.append(&mut inverted_entries);
}
None => {}
}
}
entries
}
async fn collect_inverted_entries(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
cache: Option<&InvertedIndexCacheRef>,
context: &IndexEntryContext<'_>,
) -> Vec<PuffinIndexMetaEntry> {
// Read the inverted index blob and surface its per-column metadata entries.
let file_id = region_file_id.file_id();
let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
Ok(guard) => guard,
Err(err) => {
warn!(
err;
"Failed to open inverted index blob, table_dir: {}, file_id: {}",
context.table_dir,
context.file_id
);
return Vec::new();
}
};
let blob_reader = match guard.reader().await {
Ok(reader) => reader,
Err(err) => {
warn!(
err;
"Failed to build inverted index blob reader, table_dir: {}, file_id: {}",
context.table_dir,
context.file_id
);
return Vec::new();
}
};
let blob_size = blob_reader
.metadata()
.await
.ok()
.map(|meta| meta.content_length);
let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
let reader = CachedInvertedIndexBlobReader::new(
file_id,
blob_size,
InvertedIndexBlobReader::new(blob_reader),
cache.clone(),
);
match reader.metadata().await {
Ok(metas) => metas,
Err(err) => {
warn!(
err;
"Failed to read inverted index metadata, table_dir: {}, file_id: {}",
context.table_dir,
context.file_id
);
return Vec::new();
}
}
} else {
let reader = InvertedIndexBlobReader::new(blob_reader);
match reader.metadata().await {
Ok(metas) => metas,
Err(err) => {
warn!(
err;
"Failed to read inverted index metadata, table_dir: {}, file_id: {}",
context.table_dir,
context.file_id
);
return Vec::new();
}
}
};
build_inverted_entries(context, metas.as_ref())
}
fn build_inverted_entries(
context: &IndexEntryContext<'_>,
metas: &InvertedIndexMetas,
) -> Vec<PuffinIndexMetaEntry> {
let mut entries = Vec::new();
for (name, meta) in &metas.metas {
let (target_type, target_json) = decode_target_info(name);
let inverted_value = inverted_meta_value(meta, metas);
let meta_json = build_meta_json(None, None, Some(inverted_value));
let entry = build_index_entry(
context,
INDEX_TYPE_INVERTED,
target_type,
name.clone(),
target_json,
meta.inverted_index_size,
meta_json,
);
entries.push(entry);
}
entries
}
async fn try_read_bloom_meta(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
blob_type: &str,
target_key: &str,
cache: Option<&BloomFilterIndexCacheRef>,
tag: Tag,
context: &IndexEntryContext<'_>,
) -> Option<BloomFilterMeta> {
let column_id = decode_column_id(target_key);
// Failures are logged but do not abort the overall metadata collection.
match reader.blob(blob_type).await {
Ok(guard) => match guard.reader().await {
Ok(blob_reader) => {
let blob_size = blob_reader
.metadata()
.await
.ok()
.map(|meta| meta.content_length);
let bloom_reader = BloomFilterReaderImpl::new(blob_reader);
let result = match (cache, column_id, blob_size) {
(Some(cache), Some(column_id), Some(blob_size)) => {
CachedBloomFilterIndexBlobReader::new(
region_file_id.file_id(),
column_id,
tag,
blob_size,
bloom_reader,
cache.clone(),
)
.metadata()
.await
}
_ => bloom_reader.metadata().await,
};
match result {
Ok(meta) => Some(meta),
Err(err) => {
warn!(
err;
"Failed to read index metadata, table_dir: {}, file_id: {}, blob: {}",
context.table_dir,
context.file_id,
blob_type
);
None
}
}
}
Err(err) => {
warn!(
err;
"Failed to open index blob reader, table_dir: {}, file_id: {}, blob: {}",
context.table_dir,
context.file_id,
blob_type
);
None
}
},
Err(err) => {
warn!(
err;
"Failed to open index blob, table_dir: {}, file_id: {}, blob: {}",
context.table_dir,
context.file_id,
blob_type
);
None
}
}
}
fn decode_target_info(target_key: &str) -> (String, String) {
match IndexTarget::decode(target_key) {
Ok(IndexTarget::ColumnId(id)) => (
TARGET_TYPE_COLUMN.to_string(),
json!({ "column": id }).to_string(),
),
_ => (
TARGET_TYPE_UNKNOWN.to_string(),
json!({ "error": "failed_to_decode" }).to_string(),
),
}
}
fn decode_column_id(target_key: &str) -> Option<ColumnId> {
match IndexTarget::decode(target_key) {
Ok(IndexTarget::ColumnId(id)) => Some(id),
_ => None,
}
}
fn bloom_meta_value(meta: &BloomFilterMeta) -> Value {
json!({
"rows_per_segment": meta.rows_per_segment,
"segment_count": meta.segment_count,
"row_count": meta.row_count,
"bloom_filter_size": meta.bloom_filter_size,
})
}
fn fulltext_meta_value(blob: &BlobMetadata) -> Value {
let config = FulltextConfig::from_blob_metadata(blob).unwrap_or_default();
json!({
"analyzer": config.analyzer.to_str(),
"case_sensitive": config.case_sensitive,
})
}
fn inverted_meta_value(meta: &InvertedIndexMeta, metas: &InvertedIndexMetas) -> Value {
let bitmap_type = BitmapType::try_from(meta.bitmap_type)
.map(|bt| format!("{:?}", bt))
.unwrap_or_else(|_| meta.bitmap_type.to_string());
json!({
"bitmap_type": bitmap_type,
"base_offset": meta.base_offset,
"inverted_index_size": meta.inverted_index_size,
"relative_fst_offset": meta.relative_fst_offset,
"fst_size": meta.fst_size,
"relative_null_bitmap_offset": meta.relative_null_bitmap_offset,
"null_bitmap_size": meta.null_bitmap_size,
"segment_row_count": metas.segment_row_count,
"total_row_count": metas.total_row_count,
})
}
fn build_meta_json(
bloom: Option<Value>,
fulltext: Option<Value>,
inverted: Option<Value>,
) -> Option<String> {
let mut map = Map::new();
if let Some(value) = bloom {
map.insert("bloom".to_string(), value);
}
if let Some(value) = fulltext {
map.insert("fulltext".to_string(), value);
}
if let Some(value) = inverted {
map.insert("inverted".to_string(), value);
}
if map.is_empty() {
None
} else {
Some(Value::Object(map).to_string())
}
}
enum BlobIndexTypeTargetKey<'a> {
BloomFilter(&'a str),
FulltextBloom(&'a str),
FulltextTantivy(&'a str),
Inverted,
}
impl<'a> BlobIndexTypeTargetKey<'a> {
fn from_blob_type(blob_type: &'a str) -> Option<Self> {
if let Some(target_key) = Self::target_key_from_blob(blob_type, BLOOM_BLOB_TYPE) {
Some(BlobIndexTypeTargetKey::BloomFilter(target_key))
} else if let Some(target_key) =
Self::target_key_from_blob(blob_type, FULLTEXT_BLOOM_BLOB_TYPE)
{
Some(BlobIndexTypeTargetKey::FulltextBloom(target_key))
} else if let Some(target_key) =
Self::target_key_from_blob(blob_type, FULLTEXT_TANTIVY_BLOB_TYPE)
{
Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key))
} else if blob_type == INVERTED_BLOB_TYPE {
Some(BlobIndexTypeTargetKey::Inverted)
} else {
None
}
}
fn target_key_from_blob(blob_type: &'a str, prefix: &str) -> Option<&'a str> {
// Blob types encode their target as "<prefix>-<target>".
blob_type
.strip_prefix(prefix)
.and_then(|suffix| suffix.strip_prefix('-'))
}
}
fn build_index_entry(
context: &IndexEntryContext<'_>,
index_type: &str,
target_type: String,
target_key: String,
target_json: String,
blob_size: u64,
meta_json: Option<String>,
) -> PuffinIndexMetaEntry {
PuffinIndexMetaEntry {
table_dir: context.table_dir.to_string(),
index_file_path: context.index_file_path.to_string(),
region_id: context.region_id,
table_id: context.table_id,
region_number: context.region_number,
region_group: context.region_group,
region_sequence: context.region_sequence,
file_id: context.file_id.to_string(),
index_file_size: context.index_file_size,
index_type: index_type.to_string(),
target_type,
target_key,
target_json,
blob_size,
meta_json,
node_id: context.node_id,
}
}

View File

@@ -15,4 +15,4 @@
pub(crate) mod applier;
pub(crate) mod creator;
const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1";
pub(crate) const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1";

View File

@@ -22,6 +22,7 @@ use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use index::target::IndexTarget;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
@@ -263,12 +264,14 @@ impl BloomFilterIndexApplier {
file_cache.local_store(),
WriteCachePathProvider::new(file_cache.clone()),
);
let blob_name = Self::column_blob_name(column_id);
let reader = puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(&Self::column_blob_name(column_id))
.blob(&blob_name)
.await
.context(PuffinReadBlobSnafu)?
.reader()
@@ -279,7 +282,7 @@ impl BloomFilterIndexApplier {
// TODO(ruihang): use the same util with the code in creator
fn column_blob_name(column_id: ColumnId) -> String {
format!("{INDEX_BLOB_TYPE}-{column_id}")
format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
}
/// Creates a blob reader from the remote index file
@@ -297,12 +300,14 @@ impl BloomFilterIndexApplier {
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let blob_name = Self::column_blob_name(column_id);
puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(&Self::column_blob_name(column_id))
.blob(&blob_name)
.await
.context(PuffinReadBlobSnafu)?
.reader()

View File

@@ -21,6 +21,7 @@ use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SkippingIndexType;
use datatypes::vectors::Helper;
use index::bloom_filter::creator::BloomFilterCreator;
use index::target::IndexTarget;
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
use mito_codec::row_converter::SortField;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
@@ -381,7 +382,8 @@ impl BloomFilterIndexer {
) -> Result<ByteCount> {
let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
let target_key = IndexTarget::ColumnId(*col_id);
let blob_name = format!("{INDEX_BLOB_TYPE}-{target_key}");
let (index_finish, puffin_add_blob) = futures::join!(
creator.finish(tx.compat_write()),
puffin_writer.put_blob(

View File

@@ -15,5 +15,5 @@
pub(crate) mod applier;
pub(crate) mod creator;
const INDEX_BLOB_TYPE_TANTIVY: &str = "greptime-fulltext-index-v1";
const INDEX_BLOB_TYPE_BLOOM: &str = "greptime-fulltext-index-bloom";
pub(crate) const INDEX_BLOB_TYPE_TANTIVY: &str = "greptime-fulltext-index-v1";
pub(crate) const INDEX_BLOB_TYPE_BLOOM: &str = "greptime-fulltext-index-bloom";

View File

@@ -24,6 +24,7 @@ use index::bloom_filter::reader::BloomFilterReaderImpl;
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
use index::fulltext_index::{Analyzer, Config};
use index::target::IndexTarget;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
@@ -171,7 +172,10 @@ impl FulltextIndexApplier {
column_id: ColumnId,
request: &FulltextRequest,
) -> Result<Option<BTreeSet<RowId>>> {
let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
let blob_key = format!(
"{INDEX_BLOB_TYPE_TANTIVY}-{}",
IndexTarget::ColumnId(column_id)
);
let dir = self
.index_source
.dir(file_id, &blob_key, file_size_hint)
@@ -283,7 +287,10 @@ impl FulltextIndexApplier {
terms: &[FulltextTerm],
output: &mut [(usize, Vec<Range<usize>>)],
) -> Result<bool> {
let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}");
let blob_key = format!(
"{INDEX_BLOB_TYPE_BLOOM}-{}",
IndexTarget::ColumnId(column_id)
);
let Some(reader) = self
.index_source
.blob(file_id, &blob_key, file_size_hint)

View File

@@ -25,6 +25,7 @@ use index::fulltext_index::create::{
BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
};
use index::fulltext_index::{Analyzer, Config};
use index::target::IndexTarget;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::PutOptions;
use snafu::{ResultExt, ensure};
@@ -407,16 +408,22 @@ impl AltFulltextCreator {
) -> Result<ByteCount> {
match self {
Self::Tantivy(creator) => {
let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id);
let blob_key = format!(
"{INDEX_BLOB_TYPE_TANTIVY}-{}",
IndexTarget::ColumnId(*column_id)
);
creator
.finish(puffin_writer, &key, put_options)
.finish(puffin_writer, &blob_key, put_options)
.await
.context(FulltextFinishSnafu)
}
Self::Bloom(creator) => {
let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id);
let blob_key = format!(
"{INDEX_BLOB_TYPE_BLOOM}-{}",
IndexTarget::ColumnId(*column_id)
);
creator
.finish(puffin_writer, &key, put_options)
.finish(puffin_writer, &blob_key, put_options)
.await
.context(FulltextFinishSnafu)
}

View File

@@ -15,4 +15,4 @@
pub(crate) mod applier;
pub(crate) mod creator;
const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";
pub(crate) const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

View File

@@ -27,6 +27,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use index::target::IndexTarget;
use mito_codec::index::IndexValueCodec;
use mito_codec::row_converter::SortField;
use object_store::ObjectStore;
@@ -139,8 +140,13 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
let predicates = self
.output
.iter()
.map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
.collect();
.map(|(column_id, predicates)| {
(
format!("{}", IndexTarget::ColumnId(*column_id)),
predicates.clone(),
)
})
.collect::<Vec<_>>();
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(

View File

@@ -24,6 +24,7 @@ use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use index::target::IndexTarget;
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
use mito_codec::row_converter::SortField;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
@@ -72,7 +73,7 @@ pub struct InvertedIndexer {
/// The memory usage of the index creator.
memory_usage: Arc<AtomicUsize>,
/// Ids of indexed columns and their names (`to_string` of the column id).
/// Ids of indexed columns and their encoded target keys.
indexed_column_ids: Vec<(ColumnId, String)>,
/// Region metadata for column lookups.
@@ -115,8 +116,8 @@ impl InvertedIndexer {
let indexed_column_ids = indexed_column_ids
.into_iter()
.map(|col_id| {
let col_id_str = col_id.to_string();
(col_id, col_id_str)
let target_key = format!("{}", IndexTarget::ColumnId(col_id));
(col_id, target_key)
})
.collect();
Self {
@@ -181,7 +182,7 @@ impl InvertedIndexer {
let column_indices = self.column_index_cache.as_ref().unwrap();
for ((col_id, col_id_str), &column_index) in
for ((col_id, target_key), &column_index) in
self.indexed_column_ids.iter().zip(column_indices.iter())
{
if let Some(index) = column_index {
@@ -197,7 +198,7 @@ impl InvertedIndexer {
if value_ref.is_null() {
self.index_creator
.push_with_name(col_id_str, None)
.push_with_name(target_key, None)
.await
.context(PushIndexValueSnafu)?;
} else {
@@ -208,7 +209,7 @@ impl InvertedIndexer {
)
.context(EncodeSnafu)?;
self.index_creator
.push_with_name(col_id_str, Some(&self.value_buf))
.push_with_name(target_key, Some(&self.value_buf))
.await
.context(PushIndexValueSnafu)?;
}
@@ -286,7 +287,7 @@ impl InvertedIndexer {
let n = batch.num_rows();
guard.inc_row_count(n);
for (col_id, col_id_str) in &self.indexed_column_ids {
for (col_id, target_key) in &self.indexed_column_ids {
match self.codec.pk_col_info(*col_id) {
// pk
Some(col_info) => {
@@ -308,7 +309,7 @@ impl InvertedIndexer {
.transpose()?;
self.index_creator
.push_with_name_n(col_id_str, value, n)
.push_with_name_n(target_key, value, n)
.await
.context(PushIndexValueSnafu)?;
}
@@ -327,7 +328,7 @@ impl InvertedIndexer {
let value = values.data.get_ref(i);
if value.is_null() {
self.index_creator
.push_with_name(col_id_str, None)
.push_with_name(target_key, None)
.await
.context(PushIndexValueSnafu)?;
} else {
@@ -338,7 +339,7 @@ impl InvertedIndexer {
)
.context(EncodeSnafu)?;
self.index_creator
.push_with_name(col_id_str, Some(&self.value_buf))
.push_with_name(target_key, Some(&self.value_buf))
.await
.context(PushIndexValueSnafu)?;
}

View File

@@ -249,6 +249,115 @@ impl StorageSstEntry {
}
}
/// An entry describing puffin index metadata for inspection.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PuffinIndexMetaEntry {
/// The table directory this index belongs to.
pub table_dir: String,
/// The full path of the index file in object store.
pub index_file_path: String,
/// The region id referencing the index file.
pub region_id: RegionId,
/// The table id referencing the index file.
pub table_id: TableId,
/// The region number referencing the index file.
pub region_number: RegionNumber,
/// The region group referencing the index file.
pub region_group: RegionGroup,
/// The region sequence referencing the index file.
pub region_sequence: RegionSeq,
/// Engine-specific file identifier (string form).
pub file_id: String,
/// Size of the index file in object store (if available).
pub index_file_size: Option<u64>,
/// Logical index type (`bloom_filter`, `fulltext_bloom`, `fulltext_tantivy`, `inverted`).
pub index_type: String,
/// Target type (`column`, ...).
pub target_type: String,
/// Encoded target key string.
pub target_key: String,
/// Structured JSON describing the target.
pub target_json: String,
/// Size of the blob storing this target.
pub blob_size: u64,
/// Structured JSON describing index-specific metadata (if available).
pub meta_json: Option<String>,
/// Node id associated with the index file (if known).
pub node_id: Option<u64>,
}
impl PuffinIndexMetaEntry {
/// Returns the schema describing puffin index metadata entries.
pub fn schema() -> SchemaRef {
use datatypes::prelude::ConcreteDataType as Ty;
Arc::new(Schema::new(vec![
ColumnSchema::new("table_dir", Ty::string_datatype(), false),
ColumnSchema::new("index_file_path", Ty::string_datatype(), false),
ColumnSchema::new("region_id", Ty::uint64_datatype(), false),
ColumnSchema::new("table_id", Ty::uint32_datatype(), false),
ColumnSchema::new("region_number", Ty::uint32_datatype(), false),
ColumnSchema::new("region_group", Ty::uint8_datatype(), false),
ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false),
ColumnSchema::new("file_id", Ty::string_datatype(), false),
ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true),
ColumnSchema::new("index_type", Ty::string_datatype(), false),
ColumnSchema::new("target_type", Ty::string_datatype(), false),
ColumnSchema::new("target_key", Ty::string_datatype(), false),
ColumnSchema::new("target_json", Ty::string_datatype(), false),
ColumnSchema::new("blob_size", Ty::uint64_datatype(), false),
ColumnSchema::new("meta_json", Ty::string_datatype(), true),
ColumnSchema::new("node_id", Ty::uint64_datatype(), true),
]))
}
/// Converts a list of puffin index metadata entries to a record batch.
pub fn to_record_batch(entries: &[Self]) -> std::result::Result<DfRecordBatch, ArrowError> {
let schema = Self::schema();
let table_dirs = entries.iter().map(|e| e.table_dir.as_str());
let index_file_paths = entries.iter().map(|e| e.index_file_path.as_str());
let region_ids = entries.iter().map(|e| e.region_id.as_u64());
let table_ids = entries.iter().map(|e| e.table_id);
let region_numbers = entries.iter().map(|e| e.region_number);
let region_groups = entries.iter().map(|e| e.region_group);
let region_sequences = entries.iter().map(|e| e.region_sequence);
let file_ids = entries.iter().map(|e| e.file_id.as_str());
let index_file_sizes = entries.iter().map(|e| e.index_file_size);
let index_types = entries.iter().map(|e| e.index_type.as_str());
let target_types = entries.iter().map(|e| e.target_type.as_str());
let target_keys = entries.iter().map(|e| e.target_key.as_str());
let target_jsons = entries.iter().map(|e| e.target_json.as_str());
let blob_sizes = entries.iter().map(|e| e.blob_size);
let meta_jsons = entries.iter().map(|e| e.meta_json.as_deref());
let node_ids = entries.iter().map(|e| e.node_id);
let columns: Vec<ArrayRef> = vec![
Arc::new(StringArray::from_iter_values(table_dirs)),
Arc::new(StringArray::from_iter_values(index_file_paths)),
Arc::new(UInt64Array::from_iter_values(region_ids)),
Arc::new(UInt32Array::from_iter_values(table_ids)),
Arc::new(UInt32Array::from_iter_values(region_numbers)),
Arc::new(UInt8Array::from_iter_values(region_groups)),
Arc::new(UInt32Array::from_iter_values(region_sequences)),
Arc::new(StringArray::from_iter_values(file_ids)),
Arc::new(UInt64Array::from_iter(index_file_sizes)),
Arc::new(StringArray::from_iter_values(index_types)),
Arc::new(StringArray::from_iter_values(target_types)),
Arc::new(StringArray::from_iter_values(target_keys)),
Arc::new(StringArray::from_iter_values(target_jsons)),
Arc::new(UInt64Array::from_iter_values(blob_sizes)),
Arc::new(StringArray::from_iter(meta_jsons)),
Arc::new(UInt64Array::from_iter(node_ids)),
];
DfRecordBatch::try_new(schema.arrow_schema().clone(), columns)
}
/// Reserved internal inspect table name for puffin index metadata.
pub fn reserved_table_name_for_inspection() -> &'static str {
"__inspect/__mito/__puffin_index_meta"
}
}
fn build_plan_helper(
scan_request: ScanRequest,
table_name: &str,
@@ -577,6 +686,188 @@ mod tests {
assert!(node_ids.is_null(1));
}
#[test]
fn test_puffin_index_meta_to_record_batch() {
let entries = vec![
PuffinIndexMetaEntry {
table_dir: "table1".to_string(),
index_file_path: "index1".to_string(),
region_id: RegionId::with_group_and_seq(10, 0, 20),
table_id: 10,
region_number: 20,
region_group: 0,
region_sequence: 20,
file_id: "file1".to_string(),
index_file_size: Some(1024),
index_type: "bloom_filter".to_string(),
target_type: "column".to_string(),
target_key: "1".to_string(),
target_json: "{\"column\":1}".to_string(),
blob_size: 256,
meta_json: Some("{\"bloom\":{}}".to_string()),
node_id: Some(42),
},
PuffinIndexMetaEntry {
table_dir: "table2".to_string(),
index_file_path: "index2".to_string(),
region_id: RegionId::with_group_and_seq(11, 0, 21),
table_id: 11,
region_number: 21,
region_group: 0,
region_sequence: 21,
file_id: "file2".to_string(),
index_file_size: None,
index_type: "inverted".to_string(),
target_type: "unknown".to_string(),
target_key: "legacy".to_string(),
target_json: "{}".to_string(),
blob_size: 0,
meta_json: None,
node_id: None,
},
];
let schema = PuffinIndexMetaEntry::schema();
let batch = PuffinIndexMetaEntry::to_record_batch(&entries).unwrap();
assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns());
assert_eq!(2, batch.num_rows());
let table_dirs = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("table1", table_dirs.value(0));
assert_eq!("table2", table_dirs.value(1));
let index_file_paths = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("index1", index_file_paths.value(0));
assert_eq!("index2", index_file_paths.value(1));
let region_ids = batch
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(
RegionId::with_group_and_seq(10, 0, 20).as_u64(),
region_ids.value(0)
);
assert_eq!(
RegionId::with_group_and_seq(11, 0, 21).as_u64(),
region_ids.value(1)
);
let table_ids = batch
.column(3)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
assert_eq!(10, table_ids.value(0));
assert_eq!(11, table_ids.value(1));
let region_numbers = batch
.column(4)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
assert_eq!(20, region_numbers.value(0));
assert_eq!(21, region_numbers.value(1));
let region_groups = batch
.column(5)
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap();
assert_eq!(0, region_groups.value(0));
assert_eq!(0, region_groups.value(1));
let region_sequences = batch
.column(6)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
assert_eq!(20, region_sequences.value(0));
assert_eq!(21, region_sequences.value(1));
let file_ids = batch
.column(7)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("file1", file_ids.value(0));
assert_eq!("file2", file_ids.value(1));
let index_file_sizes = batch
.column(8)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(1024, index_file_sizes.value(0));
assert!(index_file_sizes.is_null(1));
let index_types = batch
.column(9)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("bloom_filter", index_types.value(0));
assert_eq!("inverted", index_types.value(1));
let target_types = batch
.column(10)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("column", target_types.value(0));
assert_eq!("unknown", target_types.value(1));
let target_keys = batch
.column(11)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("1", target_keys.value(0));
assert_eq!("legacy", target_keys.value(1));
let target_json = batch
.column(12)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("{\"column\":1}", target_json.value(0));
assert_eq!("{}", target_json.value(1));
let blob_sizes = batch
.column(13)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(256, blob_sizes.value(0));
assert_eq!(0, blob_sizes.value(1));
let meta_jsons = batch
.column(14)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("{\"bloom\":{}}", meta_jsons.value(0));
assert!(meta_jsons.is_null(1));
let node_ids = batch
.column(15)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(42, node_ids.value(0));
assert!(node_ids.is_null(1));
}
#[test]
fn test_manifest_build_plan() {
// Note: filter must reference a column in the projected schema