feat: add more h3 scalar functions (#4707)

* feat: add more h3 scalar functions

* chore: comment up
This commit is contained in:
Ning Sun
2024-09-20 12:19:50 +08:00
committed by GitHub
parent e12ffbeb2f
commit 75c6fad1a3
6 changed files with 857 additions and 115 deletions

View File

@@ -27,6 +27,7 @@ common-time.workspace = true
common-version.workspace = true
datafusion.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
jsonb.workspace = true

View File

@@ -17,7 +17,6 @@ mod geohash;
mod h3;
use geohash::GeohashFunction;
use h3::H3Function;
use crate::function_registry::FunctionRegistry;
@@ -25,7 +24,20 @@ pub(crate) struct GeoFunctions;
impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) {
// geohash
registry.register(Arc::new(GeohashFunction));
registry.register(Arc::new(H3Function));
// h3 family
registry.register(Arc::new(h3::H3LatLngToCell));
registry.register(Arc::new(h3::H3LatLngToCellString));
registry.register(Arc::new(h3::H3CellBase));
registry.register(Arc::new(h3::H3CellCenterChild));
registry.register(Arc::new(h3::H3CellCenterLat));
registry.register(Arc::new(h3::H3CellCenterLng));
registry.register(Arc::new(h3::H3CellIsPentagon));
registry.register(Arc::new(h3::H3CellParent));
registry.register(Arc::new(h3::H3CellResolution));
registry.register(Arc::new(h3::H3CellToString));
registry.register(Arc::new(h3::H3IsNeighbour));
registry.register(Arc::new(h3::H3StringToCell));
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::str::FromStr;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
@@ -22,23 +22,118 @@ use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use h3o::{LatLng, Resolution};
use datatypes::vectors::{
BooleanVectorBuilder, Float64VectorBuilder, MutableVector, StringVectorBuilder,
UInt64VectorBuilder, UInt8VectorBuilder, VectorRef,
};
use derive_more::Display;
use h3o::{CellIndex, LatLng, Resolution};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
/// Function that returns [h3] encoding string for a given geospatial coordinate.
/// Function that returns [h3] encoding cellid for a given geospatial coordinate.
///
/// [h3]: https://h3geo.org/
#[derive(Clone, Debug, Default)]
pub struct H3Function;
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3LatLngToCell;
const NAME: &str = "h3";
impl Function for H3Function {
impl Function for H3LatLngToCell {
fn name(&self) -> &str {
NAME
"h3_latlng_to_cell"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
// longitude
coord_type.clone(),
// resolution
resolution_type.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, provided : {}",
columns.len()
),
}
);
let lat_vec = &columns[0];
let lon_vec = &columns[1];
let resolution_vec = &columns[2];
let size = lat_vec.len();
let mut results = UInt64VectorBuilder::with_capacity(size);
for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = value_to_resolution(resolution_vec.get(i))?;
let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
let coord = LatLng::new(lat, lon)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
let encoded: u64 = coord.to_cell(r).into();
Some(encoded)
}
_ => None,
};
results.push(result);
}
Ok(results.to_vector())
}
}
/// Function that returns [h3] encoding cellid in string form for a given
/// geospatial coordinate.
///
/// [h3]: https://h3geo.org/
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3LatLngToCellString;
impl Function for H3LatLngToCellString {
fn name(&self) -> &str {
"h3_latlng_to_cell_string"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
@@ -95,17 +190,7 @@ impl Function for H3Function {
for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = match resolution_vec.get(i) {
Value::Int8(v) => v as u8,
Value::Int16(v) => v as u8,
Value::Int32(v) => v as u8,
Value::Int64(v) => v as u8,
Value::UInt8(v) => v,
Value::UInt16(v) => v as u8,
Value::UInt32(v) => v as u8,
Value::UInt64(v) => v as u8,
_ => unreachable!(),
};
let r = value_to_resolution(resolution_vec.get(i))?;
let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
@@ -117,14 +202,6 @@ impl Function for H3Function {
))
})
.context(error::ExecuteSnafu)?;
let r = Resolution::try_from(r)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
let encoded = coord.to_cell(r).to_string();
Some(encoded)
}
@@ -138,8 +215,585 @@ impl Function for H3Function {
}
}
impl fmt::Display for H3Function {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME)
/// Function that converts cell id to its string form
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellToString;
impl Function for H3CellToString {
fn name(&self) -> &str {
"h3_cell_to_string"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let size = cell_vec.len();
let mut results = StringVectorBuilder::with_capacity(size);
for i in 0..size {
let cell_id_string = cell_from_value(cell_vec.get(i))?.map(|c| c.to_string());
results.push(cell_id_string.as_deref());
}
Ok(results.to_vector())
}
}
/// Function that converts cell string id to uint64 number
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3StringToCell;
impl Function for H3StringToCell {
fn name(&self) -> &str {
"h3_string_to_cell"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
Volatility::Stable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, provided : {}",
columns.len()
),
}
);
let string_vec = &columns[0];
let size = string_vec.len();
let mut results = UInt64VectorBuilder::with_capacity(size);
for i in 0..size {
let cell = string_vec.get(i);
let cell_id = match cell {
Value::String(v) => Some(
CellIndex::from_str(v.as_utf8())
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?
.into(),
),
_ => None,
};
results.push(cell_id);
}
Ok(results.to_vector())
}
}
/// Function that returns centroid latitude of given cell id
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellCenterLat;
impl Function for H3CellCenterLat {
fn name(&self) -> &str {
"h3_cell_center_lat"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::float64_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let size = cell_vec.len();
let mut results = Float64VectorBuilder::with_capacity(size);
for i in 0..size {
let cell = cell_from_value(cell_vec.get(i))?;
let lat = cell.map(|cell| LatLng::from(cell).lat());
results.push(lat);
}
Ok(results.to_vector())
}
}
/// Function that returns centroid longitude of given cell id
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellCenterLng;
impl Function for H3CellCenterLng {
fn name(&self) -> &str {
"h3_cell_center_lng"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::float64_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let size = cell_vec.len();
let mut results = Float64VectorBuilder::with_capacity(size);
for i in 0..size {
let cell = cell_from_value(cell_vec.get(i))?;
let lat = cell.map(|cell| LatLng::from(cell).lng());
results.push(lat);
}
Ok(results.to_vector())
}
}
/// Function that returns resolution of given cell id
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellResolution;
impl Function for H3CellResolution {
fn name(&self) -> &str {
"h3_cell_resolution"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint8_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let size = cell_vec.len();
let mut results = UInt8VectorBuilder::with_capacity(size);
for i in 0..size {
let cell = cell_from_value(cell_vec.get(i))?;
let res = cell.map(|cell| cell.resolution().into());
results.push(res);
}
Ok(results.to_vector())
}
}
/// Function that returns base cell of given cell id
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellBase;
impl Function for H3CellBase {
fn name(&self) -> &str {
"h3_cell_base"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint8_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let size = cell_vec.len();
let mut results = UInt8VectorBuilder::with_capacity(size);
for i in 0..size {
let cell = cell_from_value(cell_vec.get(i))?;
let res = cell.map(|cell| cell.base_cell().into());
results.push(res);
}
Ok(results.to_vector())
}
}
/// Function that check if given cell id is a pentagon
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellIsPentagon;
impl Function for H3CellIsPentagon {
fn name(&self) -> &str {
"h3_cell_is_pentagon"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let size = cell_vec.len();
let mut results = BooleanVectorBuilder::with_capacity(size);
for i in 0..size {
let cell = cell_from_value(cell_vec.get(i))?;
let res = cell.map(|cell| cell.is_pentagon());
results.push(res);
}
Ok(results.to_vector())
}
}
/// Function that returns center child cell of given cell id
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellCenterChild;
impl Function for H3CellCenterChild {
fn name(&self) -> &str {
"h3_cell_center_child"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell_and_resolution()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 2, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let res_vec = &columns[1];
let size = cell_vec.len();
let mut results = UInt64VectorBuilder::with_capacity(size);
for i in 0..size {
let cell = cell_from_value(cell_vec.get(i))?;
let res = value_to_resolution(res_vec.get(i))?;
let result = cell
.and_then(|cell| cell.center_child(res))
.map(|c| c.into());
results.push(result);
}
Ok(results.to_vector())
}
}
/// Function that returns parent cell of given cell id and resolution
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3CellParent;
impl Function for H3CellParent {
fn name(&self) -> &str {
"h3_cell_parent"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
signature_of_cell_and_resolution()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 2, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let res_vec = &columns[1];
let size = cell_vec.len();
let mut results = UInt64VectorBuilder::with_capacity(size);
for i in 0..size {
let cell = cell_from_value(cell_vec.get(i))?;
let res = value_to_resolution(res_vec.get(i))?;
let result = cell.and_then(|cell| cell.parent(res)).map(|c| c.into());
results.push(result);
}
Ok(results.to_vector())
}
}
/// Function that checks if two cells are neighbour
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct H3IsNeighbour;
impl Function for H3IsNeighbour {
fn name(&self) -> &str {
"h3_is_neighbour"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> Signature {
signature_of_double_cell()
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 2, provided : {}",
columns.len()
),
}
);
let cell_vec = &columns[0];
let cell2_vec = &columns[1];
let size = cell_vec.len();
let mut results = BooleanVectorBuilder::with_capacity(size);
for i in 0..size {
let result = match (
cell_from_value(cell_vec.get(i))?,
cell_from_value(cell2_vec.get(i))?,
) {
(Some(cell_this), Some(cell_that)) => {
let is_neighbour = cell_this
.is_neighbor_with(cell_that)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
Some(is_neighbour)
}
_ => None,
};
results.push(result);
}
Ok(results.to_vector())
}
}
fn value_to_resolution(v: Value) -> Result<Resolution> {
let r = match v {
Value::Int8(v) => v as u8,
Value::Int16(v) => v as u8,
Value::Int32(v) => v as u8,
Value::Int64(v) => v as u8,
Value::UInt8(v) => v,
Value::UInt16(v) => v as u8,
Value::UInt32(v) => v as u8,
Value::UInt64(v) => v as u8,
_ => unreachable!(),
};
Resolution::try_from(r)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)
}
fn signature_of_cell() -> Signature {
let mut signatures = Vec::new();
for cell_type in &[
ConcreteDataType::uint64_datatype(),
ConcreteDataType::int64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![cell_type.clone()]));
}
Signature::one_of(signatures, Volatility::Stable)
}
fn signature_of_double_cell() -> Signature {
let mut signatures = Vec::new();
let cell_types = &[
ConcreteDataType::uint64_datatype(),
ConcreteDataType::int64_datatype(),
];
for cell_type in cell_types {
for cell_type2 in cell_types {
signatures.push(TypeSignature::Exact(vec![
cell_type.clone(),
cell_type2.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}
fn signature_of_cell_and_resolution() -> Signature {
let mut signatures = Vec::new();
for cell_type in &[
ConcreteDataType::uint64_datatype(),
ConcreteDataType::int64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![
cell_type.clone(),
resolution_type.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}
fn cell_from_value(v: Value) -> Result<Option<CellIndex>> {
let cell = match v {
Value::Int64(v) => Some(
CellIndex::try_from(v as u64)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?,
),
Value::UInt64(v) => Some(
CellIndex::try_from(v)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?,
),
_ => None,
};
Ok(cell)
}