Files

410 lines
16 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2018-2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Factory functions for ops added to openvino opset13."""
from functools import partial, singledispatch
from typing import Literal, Optional, Union
import logging
import numpy as np
log = logging.getLogger(__name__)
from openvino import Node, Shape, Type, Output, Tensor
from openvino.op import Constant, Result
from openvino.opset1 import convert_like
from openvino.utils.node_factory import _get_node_factory
from openvino.utils.decorators import binary_op, nameable_op, unary_op, overloading
from openvino.utils.types import (
NumericData,
NodeInput,
NumericType,
as_nodes,
as_node,
)
_get_node_factory_opset13 = partial(_get_node_factory, "opset13")
# -------------------------------------------- ops ------------------------------------------------
@binary_op
def bitwise_and(
left_node: NodeInput,
right_node: NodeInput,
auto_broadcast: str = "NUMPY",
name: Optional[str] = None,
) -> Node:
"""Return node which performs bitwise AND operation on input nodes element-wise.
For boolean input tensors, operator is equivalent to logical_and.
:param left_node: Tensor of integer or boolean datatype providing data.
:param right_node: Tensor of integer or boolean datatype providing data.
:param auto_broadcast: The type of broadcasting specifies rules used for auto-broadcasting of input tensors. Defaults to “NUMPY”.
:param name: The optional new name for output node.
:return: The node performing bitwise AND operation on input nodes corresponding elements.
"""
return _get_node_factory_opset13().create(
"BitwiseAnd",
[left_node, right_node],
{"auto_broadcast": auto_broadcast.upper()},
)
@unary_op
def bitwise_not(
node: NodeInput,
name: Optional[str] = None,
) -> Node:
"""Return node which performs bitwise NOT operation on input node element-wise.
For boolean input tensors, operator is equivalent to logical_not.
:param node: Tensor of integer or boolean datatype providing data.
:param name: The optional new name for output node.
:return: The node performing bitwise NOT operation on the given tensor.
"""
return _get_node_factory_opset13().create(
"BitwiseNot",
[node],
)
@binary_op
def bitwise_or(
left_node: NodeInput,
right_node: NodeInput,
auto_broadcast: str = "NUMPY",
name: Optional[str] = None,
) -> Node:
"""Return node which performs bitwise OR operation on input nodes element-wise.
For boolean input tensors, operator is equivalent to logical_or.
:param left_node: Tensor of integer or boolean datatype providing data.
:param right_node: Tensor of integer or boolean datatype providing data.
:param auto_broadcast: The type of broadcasting specifies rules used for auto-broadcasting of input tensors. Defaults to “NUMPY”.
:param name: The optional new name for output node.
:return: The node performing bitwise OR operation on input nodes corresponding elements.
"""
return _get_node_factory_opset13().create(
"BitwiseOr",
[left_node, right_node],
{"auto_broadcast": auto_broadcast.upper()},
)
@binary_op
def bitwise_xor(
left_node: NodeInput,
right_node: NodeInput,
auto_broadcast: str = "NUMPY",
name: Optional[str] = None,
) -> Node:
"""Return node which performs bitwise XOR operation on input nodes element-wise.
For boolean input tensors, operator is equivalent to logical_xor.
:param left_node: Tensor of integer or boolean datatype providing data.
:param right_node: Tensor of integer or boolean datatype providing data.
:param auto_broadcast: The type of broadcasting specifies rules used for auto-broadcasting of input tensors. Defaults to “NUMPY”.
:param name: The optional new name for output node.
:return: The node performing bitwise XOR operation on input nodes corresponding elements.
"""
return _get_node_factory_opset13().create(
"BitwiseXor",
[left_node, right_node],
{"auto_broadcast": auto_broadcast.upper()},
)
@nameable_op
def fake_convert(
data: NodeInput,
scale: NodeInput,
shift: Optional[NodeInput] = None,
destination_type: Literal["f8e4m3", "f8e5m2"] = "f8e4m3",
name: Optional[str] = None,
) -> Node:
"""Return a node which performs FakeConvert.
FakeConvert is experimental and may change in the future.
.. warning:: FakeConvert is experimental and may change in the future.
:param data: The node with data tensor with FP16, BF16 or FP32 datatype.
:param scale: Tensor with a scale factor for the data input value,
of the same type as the data, and shape Numpy-broadcastable to data.
:param shift: Optional tensor with value to subtract before and add after conversion of the data input value,
of the same type as the data, and shape Numpy-broadcastable to data.
:param destination_type: Type to emulate, string of either "f8e4m3" or "f8e5m2".
:param name: The optional new name for output node.
:return: The new node performing FakeConvert operation.
"""
nodes = [data, scale]
if shift is not None:
nodes.append(shift)
return _get_node_factory_opset13().create(
"FakeConvert",
as_nodes(*nodes, name=name),
{"destination_type": destination_type},
)
@nameable_op
def multinomial(
probs: NodeInput,
num_samples: NodeInput,
convert_type: str,
with_replacement: bool,
log_probs: bool,
global_seed: int = 0,
op_seed: int = 0,
name: Optional[str] = None,
) -> Node:
"""Return a node which generates a sequence of class indices sampled from the multinomial distribution.
:param probs: Tensor with probabilities of floating-point type, and shape [batch_size, class_size].
:param num_samples: Tensor (scalar or 1D) a single element of type i32 or i64,
specifying the number of samples to draw from the multinomial distribution.
:param convert_type: Specifies the output tensor type, possible values: 'i64', 'i32'.
:param with_replacement: Flag that specifies whether to sample with replacement.
:param log_probs: Flag that specifies whether *probs* should be treated as unnormalized log probabilities.
:param global_seed: Specifies global seed value. Required to be a positive integer or 0.
:param op_seed: Specifies operational seed value. Required to be a positive integer or 0.
:param name: The optional new name for output node.
:return: The new node performing Multinomial operation.
"""
inputs = as_nodes(probs, num_samples, name=name)
if global_seed < 0:
raise RuntimeError(f"global_seed should be positive or 0. Got: {global_seed}")
if op_seed < 0:
raise RuntimeError(f"op_seed should be positive or 0. Got: {op_seed}")
attributes = {
"convert_type": convert_type,
"with_replacement": with_replacement,
"log_probs": log_probs,
"global_seed": global_seed,
"op_seed": op_seed,
}
return _get_node_factory_opset13().create("Multinomial", inputs, attributes)
@nameable_op
def nms_rotated(
boxes: NodeInput,
scores: NodeInput,
max_output_boxes_per_class: NodeInput,
iou_threshold: NodeInput,
score_threshold: NodeInput,
sort_result_descending: bool = True,
output_type: str = "i64",
clockwise: bool = True,
name: Optional[str] = None,
) -> Node:
"""Return a node which performs NMSRotated.
:param boxes: Tensor with box coordinates of floating point type and shape [num_batches, num_boxes, 5],
where the last dimension is defined as [x_ctr, y_ctr, width, height, angle_radians].
:param scores: Tensor with box scores of floating point type and shape [num_batches, num_classes, num_boxes].
:param max_output_boxes_per_class: Tensor (scalar or 1D) of integer type, specifying maximum number of boxes
to be selected per class.
:param iou_threshold: Tensor (scalar or 1D) of floating point type, specifying intersection over union threshold
:param score_threshold: Tensor (scalar or 1D) of floating point type, specifying minimum score to consider box for the processing.
:param sort_result_descending: Flag that specifies whenever it is necessary to sort selected
boxes across batches or not.
:param output_type: Output element type.
:param clockwise: Flag that specifies direction of the box rotation.
:return: The new node which performs NMSRotated
"""
inputs = as_nodes(boxes, scores, max_output_boxes_per_class, iou_threshold, score_threshold, name=name)
attributes = {
"sort_result_descending": sort_result_descending,
"output_type": output_type,
"clockwise": clockwise,
}
return _get_node_factory_opset13().create("NMSRotated", inputs, attributes)
@nameable_op
def scaled_dot_product_attention(
query: NodeInput,
key: NodeInput,
value: NodeInput,
attention_mask: Optional[NodeInput] = None,
scale: Optional[NodeInput] = None,
causal: bool = False,
name: Optional[str] = None,
) -> Node:
"""Return a node which implements Scaled Dot Product Attention.
:param query: Query tensor of shape [N, ..., L, E] and floating-point datatype.
:param key: Key tensor of shape [N, ..., S, E] and floating-point datatype.
:param value: Value tensor of shape [N, ..., S, Ev] and floating-point datatype.
:param attention_mask: Optional attention mask tensor of shape [N, ..., L, S] or scalar float type zero value.
Refer to the operation specification for a complete description.
:param scale: Optional alternative scale, a floating-point type scalar.
:param causal: If true, then autogenerates causal attention mask instead of using attention_mask input.
In this case attention_mask input is ignored.
:param name: The optional new name for output node.
:return: The new node performing Scaled Dot Product Attention operation.
"""
inputs = as_nodes(query, key, value, name=name)
if attention_mask is not None:
inputs.append(as_node(attention_mask, name=name))
elif scale is not None:
inputs.append(as_node(convert_like(constant(np.array(0, np.int32)), inputs[0]), name=name))
if scale is not None:
inputs.append(as_node(scale, name=name))
attributes = {
"causal": causal,
}
return _get_node_factory_opset13().create("ScaledDotProductAttention", inputs, attributes)
@overloading(Union[NumericData, np.number, bool, np.bool_, list], Union[NumericType, Type], Optional[str], bool) # type: ignore
@nameable_op
def constant(
value: Union[NumericData, np.number, bool, np.bool_, list],
dtype: Union[NumericType, Type] = None,
name: Optional[str] = None,
*,
shared_memory: bool = False,
) -> Constant:
"""Create a Constant node from provided value.
:param value: One of: array of values or scalar to initialize node with.
:param dtype: The data type of provided data.
If dtype does not match, data will be converted.
Note: disables sharing of the memory when convertion occurs.
:param name: Optional name for output node.
:param shared_memory: keyword-only argument.
If `True`, this Constant's memory is being shared with a host,
that means the responsibility of keeping host memory is
on the side of a user. Any action performed on the host
memory is reflected on this Constant's memory!
If `False`, data is being copied to this Constant.
Requires data to be C_CONTIGUOUS if `True`.
Disabled by default if:
- value is a scalar.
- dtype is one of: Type.u1, Type.i4, Type.u4, Type.nf4, Type.bf16.
- dtype force conversion of data.
:return: The Constant node initialized with provided data.
"""
def display_shared_memory_warning(warning_message: str) -> None:
if shared_memory:
log.warning(f"{warning_message}. Memory sharing is disabled by default. Set shared_memory=False to hide this warning.")
if isinstance(value, np.ndarray):
_value, _shared_memory = value, shared_memory
else:
_value, _shared_memory = np.array(value), False
display_shared_memory_warning(f"Converting scalar to corresponding type of {_value.dtype}")
# Handle type casting, when dtype is not None:
if dtype:
# Expect packed data, use different constructor to handle it correctly:
if dtype in [Type.u1, Type.i4, Type.u4, Type.nf4, Type.f4e2m1]:
display_shared_memory_warning(f"Constant initialized with packed type of {dtype}")
return Constant(dtype, Shape(_value.shape), _value.flatten().tolist())
elif dtype in [Type.bf16, Type.f8e8m0, Type.f8e4m3, Type.f8e5m2]:
display_shared_memory_warning(f"Constant initialized with OpenVINO custom {dtype}")
return Constant(dtype, Shape(_value.shape), _value.flatten().tolist())
# General use-case for all other types:
else:
_dtype = dtype.to_dtype() if isinstance(dtype, Type) else dtype
if _dtype is int:
display_shared_memory_warning("Converting scalar type of undefined bitwidth to 32-bit integer")
_value, _shared_memory = _value.astype(np.int32), False
elif _dtype is float:
display_shared_memory_warning("Converting scalar type of undefined bitwidth to 32-bit float")
_value, _shared_memory = _value.astype(np.float32), False
elif _dtype is bool:
display_shared_memory_warning("Converting bool type to numpy bool")
_value, _shared_memory = _value.astype(np.bool_), False
else:
if _dtype != _value.dtype:
display_shared_memory_warning(f"Converting value of {_value.dtype} to {_dtype}")
_value, _shared_memory = _value.astype(_dtype), False
# Create Constant itself:
return Constant(_value, shared_memory=_shared_memory)
@overloading(Tensor, bool, Optional[str]) # type: ignore
@nameable_op
def constant( # noqa: F811
tensor: Tensor,
shared_memory: bool = True,
name: Optional[str] = None,
) -> Constant:
return Constant(tensor, shared_memory=shared_memory)
@unary_op
def result(data: Union[Node, Output, NumericData], name: Optional[str] = None) -> Node:
"""Return a node which represents an output of a graph (Model).
:param data: The tensor containing the input data
:return: Result node
"""
if isinstance(data, Node):
return Result(data.output(0))
return Result(data)
@nameable_op
def fake_quantize(
data: NodeInput,
input_low: NodeInput,
input_high: NodeInput,
output_low: NodeInput,
output_high: NodeInput,
levels: int,
auto_broadcast: str = "NUMPY",
name: Optional[str] = None,
) -> Node:
r"""Perform an element-wise linear quantization on input data.
:param data: The node with data tensor.
:param input_low: The node with the minimum for input values.
:param input_high: The node with the maximum for input values.
:param output_low: The node with the minimum quantized value.
:param output_high: The node with the maximum quantized value.
:param levels: The number of quantization levels. Integer value.
:param auto_broadcast: The type of broadcasting specifies rules used for
auto-broadcasting of input tensors.
:param name: Optional name of the new node.
:return: New node with quantized value.
Input floating point values are quantized into a discrete set of floating point values.
.. code-block:: python
if x <= input_low:
output = output_low
if x > input_high:
output = output_high
else:
output = fake_quantize(output)
Fake quantize uses the following logic:
\f[ output =
\dfrac{round( \dfrac{data - input\_low}{(input\_high - input\_low)\cdot (levels-1)})}
{(levels-1)\cdot (output\_high - output\_low)} + output\_low \f]
"""
return _get_node_factory_opset13().create(
"FakeQuantize",
as_nodes(data, input_low, input_high, output_low, output_high, name=name),
{"levels": levels, "auto_broadcast": auto_broadcast.upper()},
)