Compare commits

..

1 Commits

Author SHA1 Message Date
Pascal Seitz
375d1f9dac prepare for merge 2023-01-24 17:18:41 +08:00
12 changed files with 118 additions and 112 deletions

View File

@@ -22,6 +22,9 @@ pub struct Column<T> {
}
impl<T: PartialOrd> Column<T> {
pub fn get_cardinality(&self) -> Cardinality {
self.idx.get_cardinality()
}
pub fn num_rows(&self) -> RowId {
match &self.idx {
ColumnIndex::Full => self.values.num_vals() as u32,

View File

@@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::io;
use super::writer::ColumnarSerializer;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
use crate::ColumnType;
use crate::{Cardinality, ColumnType};
pub enum MergeDocOrder {
/// Columnar tables are simply stacked one above the other.
@@ -19,20 +20,30 @@ pub enum MergeDocOrder {
}
pub fn merge_columnar(
_columnar_readers: &[ColumnarReader],
columnar_readers: &[ColumnarReader],
mapping: MergeDocOrder,
_output: &mut impl io::Write,
output: &mut impl io::Write,
) -> io::Result<()> {
match mapping {
MergeDocOrder::Stack => {
// implement me :)
todo!();
}
MergeDocOrder::Complex(_) => {
// for later
todo!();
let mut serializer = ColumnarSerializer::new(output);
// TODO handle dictionary merge for Str/Bytes column
let field_name_to_group = group_columns_for_merge(columnar_readers)?;
for (column_name, category_to_columns) in field_name_to_group {
for (_category, columns_to_merge) in category_to_columns {
let column_type = columns_to_merge[0].column_type();
let mut column_serialzier =
serializer.serialize_column(column_name.as_bytes(), column_type);
merge_columns(
column_type,
&columns_to_merge,
&mapping,
&mut column_serialzier,
)?;
}
}
serializer.finalize()?;
Ok(())
}
/// Column types are grouped into different categories.
@@ -44,7 +55,7 @@ pub fn merge_columnar(
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
#[repr(u8)]
enum ColumnTypeCategory {
pub enum ColumnTypeCategory {
Bool,
Str,
Numerical,
@@ -68,8 +79,41 @@ impl From<ColumnType> for ColumnTypeCategory {
}
}
fn collect_columns(
columnar_readers: &[&ColumnarReader],
pub fn detect_cardinality(columns: &[DynamicColumn]) -> Cardinality {
if columns
.iter()
.any(|column| column.get_cardinality().is_multivalue())
{
return Cardinality::Multivalued;
}
if columns
.iter()
.any(|column| column.get_cardinality().is_optional())
{
return Cardinality::Optional;
}
Cardinality::Full
}
pub fn compute_num_docs(columns: &[DynamicColumn], mapping: &MergeDocOrder) -> usize {
// TODO handle deletes
0
}
pub fn merge_columns(
column_type: ColumnType,
columns: &[DynamicColumn],
mapping: &MergeDocOrder,
column_serializer: &mut impl io::Write,
) -> io::Result<()> {
let cardinality = detect_cardinality(columns);
Ok(())
}
pub fn group_columns_for_merge(
columnar_readers: &[ColumnarReader],
) -> io::Result<HashMap<String, HashMap<ColumnTypeCategory, Vec<DynamicColumn>>>> {
// Each column name may have multiple types of column associated.
// For merging we are interested in the same column type category since they can be merged.
@@ -117,26 +161,20 @@ fn cast_to_common_numerical_column(columns: &[DynamicColumn]) -> Vec<DynamicColu
.all(|column| column.column_type().numerical_type().is_some()));
let coerce_to_i64: Vec<_> = columns
.iter()
.map(|column| column.clone().coerce_to_i64())
.filter_map(|column| column.clone().coerce_to_i64())
.collect();
if coerce_to_i64.iter().all(|column| column.is_some()) {
return coerce_to_i64
.into_iter()
.map(|column| column.unwrap())
.collect();
if coerce_to_i64.len() == columns.len() {
return coerce_to_i64;
}
let coerce_to_u64: Vec<_> = columns
.iter()
.map(|column| column.clone().coerce_to_u64())
.filter_map(|column| column.clone().coerce_to_u64())
.collect();
if coerce_to_u64.iter().all(|column| column.is_some()) {
return coerce_to_u64
.into_iter()
.map(|column| column.unwrap())
.collect();
if coerce_to_u64.len() == columns.len() {
return coerce_to_u64;
}
columns
@@ -183,7 +221,9 @@ mod tests {
ColumnarReader::open(buffer).unwrap()
};
let column_map = collect_columns(&[&columnar1, &columnar2, &columnar3]).unwrap();
let column_map =
group_columns_for_merge(&[columnar1.clone(), columnar2.clone(), columnar3.clone()])
.unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
@@ -191,14 +231,14 @@ mod tests {
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_f64()));
let column_map = collect_columns(&[&columnar1, &columnar1]).unwrap();
let column_map = group_columns_for_merge(&[columnar1.clone(), columnar1.clone()]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);
let numerical = cat_to_columns.get(&ColumnTypeCategory::Numerical).unwrap();
assert!(numerical.iter().all(|column| column.is_i64()));
let column_map = collect_columns(&[&columnar2, &columnar2]).unwrap();
let column_map = group_columns_for_merge(&[columnar2.clone(), columnar2.clone()]).unwrap();
assert_eq!(column_map.len(), 1);
let cat_to_columns = column_map.get("numbers").unwrap();
assert_eq!(cat_to_columns.len(), 1);

View File

@@ -0,0 +1 @@

View File

@@ -1,6 +1,7 @@
mod column_type;
mod format_version;
mod merge;
mod merge_index;
mod reader;
mod writer;

View File

@@ -13,6 +13,7 @@ fn io_invalid_data(msg: String) -> io::Error {
/// The ColumnarReader makes it possible to access a set of columns
/// associated to field names.
#[derive(Clone)]
pub struct ColumnarReader {
column_dictionary: Dictionary<RangeSSTable>,
column_data: FileSlice,

View File

@@ -8,7 +8,7 @@ use std::net::Ipv6Addr;
use column_operation::ColumnOperation;
use common::CountingWriter;
use serializer::ColumnarSerializer;
pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::SerializableColumnIndex;

View File

@@ -8,7 +8,7 @@ use common::{HasLen, OwnedBytes};
use crate::column::{BytesColumn, Column, StrColumn};
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
use crate::columnar::ColumnType;
use crate::{DateTime, NumericalType};
use crate::{Cardinality, DateTime, NumericalType};
#[derive(Clone)]
pub enum DynamicColumn {
@@ -23,6 +23,18 @@ pub enum DynamicColumn {
}
impl DynamicColumn {
pub fn get_cardinality(&self) -> Cardinality {
match self {
DynamicColumn::Bool(c) => c.get_cardinality(),
DynamicColumn::I64(c) => c.get_cardinality(),
DynamicColumn::U64(c) => c.get_cardinality(),
DynamicColumn::F64(c) => c.get_cardinality(),
DynamicColumn::IpAddr(c) => c.get_cardinality(),
DynamicColumn::DateTime(c) => c.get_cardinality(),
DynamicColumn::Bytes(c) => c.ords().get_cardinality(),
DynamicColumn::Str(c) => c.ords().get_cardinality(),
}
}
pub fn column_type(&self) -> ColumnType {
match self {
DynamicColumn::Bool(_) => ColumnType::Bool,

View File

@@ -62,6 +62,12 @@ pub enum Cardinality {
}
impl Cardinality {
pub fn is_optional(&self) -> bool {
matches!(self, Cardinality::Optional)
}
pub fn is_multivalue(&self) -> bool {
matches!(self, Cardinality::Multivalued)
}
pub(crate) fn to_code(self) -> u8 {
self as u8
}

View File

@@ -11,7 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{
use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
};
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
use crate::error::DataCorruption;
use crate::fastfield::MultiValuedFastFieldReader;
use crate::schema::Type;
@@ -268,18 +268,21 @@ impl TermBuckets {
term_ids: &[u64],
doc: DocId,
sub_aggregation: &AggregationsWithAccessor,
bucket_count: &BucketCount,
blueprint: &Option<SegmentAggregationResultsCollector>,
) -> crate::Result<()> {
for &term_id in term_ids {
let entry = self
.entries
.entry(term_id as u32)
.or_insert_with(|| TermBucketEntry::from_blueprint(blueprint));
let entry = self.entries.entry(term_id as u32).or_insert_with(|| {
bucket_count.add_count(1);
TermBucketEntry::from_blueprint(blueprint)
});
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(doc, sub_aggregation)?;
}
}
bucket_count.validate_bucket_count()?;
Ok(())
}
@@ -369,7 +372,7 @@ impl SegmentTermCollector {
}
OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the
// results when cutting off due to unknown nature of the sub_aggregation (possible
// results when cutting off du to unknown nature of the sub_aggregation (possible
// to check).
}
OrderTarget::Count => {
@@ -409,10 +412,6 @@ impl SegmentTermCollector {
if self.req.min_doc_count == 0 {
let mut stream = term_dict.stream()?;
while let Some((key, _ord)) = stream.next() {
if dict.len() >= self.req.segment_size as usize {
break;
}
let key = std::str::from_utf8(key)
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
if !dict.contains_key(key) {
@@ -434,8 +433,6 @@ impl SegmentTermCollector {
sum_other_doc_count += sum_other_docs;
dict = dict_entries.into_iter().collect();
}
agg_with_accessor.bucket_count.add_count(dict.len() as u32);
agg_with_accessor.bucket_count.validate_bucket_count()?;
Ok(IntermediateBucketResult::Terms(
IntermediateTermBucketResult {
@@ -472,24 +469,28 @@ impl SegmentTermCollector {
&vals1,
docs[0],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals2,
docs[1],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals3,
docs[2],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals4,
docs[3],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
}
@@ -500,6 +501,7 @@ impl SegmentTermCollector {
&vals1,
doc,
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
}
@@ -1134,33 +1136,6 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
min_doc_count: Some(0),
size: Some(1),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
// searching for terma, but min_doc_count will return all terms
let res = exec_request_with_query(agg_req, &index, Some(("string_id", "terma")))?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
assert_eq!(
res["my_texts"]["buckets"][1]["key"],
serde_json::Value::Null
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);
Ok(())
}
@@ -1239,27 +1214,6 @@ mod tests {
let index = get_test_index_from_terms(true, &terms_per_segment)?;
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_ok());
// This request has min_doc_count set to 0
// That means we load potentially the whole dict
// Make sure the bucket count is still fine
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
@@ -1274,24 +1228,6 @@ mod tests {
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_ok());
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
size: Some(70_000),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();
let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_err());
@@ -1448,10 +1384,14 @@ mod bench {
let mut collector = get_collector_with_buckets(total_terms);
let vals = get_rand_terms(total_terms, num_terms);
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
let bucket_count: BucketCount = BucketCount {
bucket_count: Default::default(),
max_bucket_count: 1_000_001u32,
};
b.iter(|| {
for &val in &vals {
collector
.increment_bucket(&[val], 0, &aggregations_with_accessor, &None)
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
.unwrap();
}
})

View File

@@ -30,6 +30,7 @@ use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal};
/// block boundary.
///
/// (See also README.md)
#[derive(Debug, Clone)]
pub struct Dictionary<TSSTable: SSTable> {
pub sstable_slice: FileSlice,
pub sstable_index: SSTableIndex,

View File

@@ -117,6 +117,7 @@ impl SSTable for MonotonicU64SSTable {
/// `range_sstable[k1].end == range_sstable[k2].start`.
///
/// The first range is not required to start at `0`.
#[derive(Clone, Copy, Debug)]
pub struct RangeSSTable;
impl SSTable for RangeSSTable {

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
}
@@ -75,7 +75,7 @@ pub struct BlockAddr {
pub first_ordinal: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,