499 lines
21 KiB
Python
499 lines
21 KiB
Python
# Copyright (C) 2018-2025 Intel Corporation
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
# flake8: noqa
|
|
# mypy: ignore-errors
|
|
|
|
|
|
import logging as log
|
|
import numpy as np
|
|
import sys
|
|
from openvino import PartialShape, Dimension, Type
|
|
from packaging.version import parse, Version
|
|
from typing import Union
|
|
|
|
|
|
# TODO: reuse this method in ovc and remove duplication
|
|
def get_static_shape(shape: [PartialShape, list, tuple], dynamic_value=None):
|
|
# Current function returns list with static dimensions with following logic.
|
|
# For dynamic dimensions return lower boundaries if they are set, otherwise
|
|
# return upper boundaries if they are set. If dimension is fully dynamic then raise error.
|
|
shape_list = []
|
|
for idx, dim in enumerate(shape):
|
|
if isinstance(dim, int):
|
|
if dim == -1:
|
|
shape_list.append(dynamic_value)
|
|
continue
|
|
shape_list.append(dim)
|
|
elif isinstance(dim, np.int64):
|
|
if dim == np.int64(-1):
|
|
shape_list.append(dynamic_value)
|
|
continue
|
|
shape_list.append(dim)
|
|
elif isinstance(dim, tuple):
|
|
# tuple where (min_length, max_length), the format which uses MO cli parser
|
|
assert len(dim) == 2, "Unknown dimension type {}".format(dim)
|
|
if dim[0] > 0:
|
|
shape_list.append(dim[0])
|
|
elif dim[1] < np.iinfo(np.int64).max:
|
|
shape_list.append(dim[1])
|
|
else:
|
|
shape_list.append(dynamic_value)
|
|
continue
|
|
elif isinstance(dim, Dimension):
|
|
if dim.is_static or dim.get_min_length() > 0:
|
|
shape_list.append(dim.get_min_length())
|
|
elif dim.get_max_length() != -1:
|
|
shape_list.append(dim.get_max_length())
|
|
else:
|
|
shape_list.append(dynamic_value)
|
|
continue
|
|
else:
|
|
raise Exception("Unknown dimension type {}".format(dim))
|
|
|
|
return tuple(shape_list)
|
|
|
|
|
|
def get_imported_module_version(imported_module):
|
|
"""
|
|
Get imported module version
|
|
:return: version(str) or raise AttributeError exception
|
|
"""
|
|
version_attrs = ("__version__", "VERSION", "version")
|
|
installed_version = None
|
|
for attr in version_attrs:
|
|
installed_version = getattr(imported_module, attr, None)
|
|
if isinstance(installed_version, str):
|
|
return installed_version
|
|
else:
|
|
installed_version = None
|
|
|
|
if installed_version is None:
|
|
raise AttributeError("{} module doesn't have version attribute".format(imported_module))
|
|
else:
|
|
return installed_version
|
|
|
|
|
|
# TODO: reuse this method in ovc and remove duplication
|
|
def get_environment_setup(framework):
|
|
"""
|
|
Get environment setup such as Python version, TensorFlow version
|
|
:param framework: framework name
|
|
:return: a dictionary of environment variables
|
|
"""
|
|
env_setup = dict()
|
|
python_version = "{}.{}.{}".format(sys.version_info.major,
|
|
sys.version_info.minor,
|
|
sys.version_info.micro)
|
|
env_setup['python_version'] = python_version
|
|
try:
|
|
if framework == 'tf':
|
|
exec("import tensorflow")
|
|
env_setup['tensorflow'] = get_imported_module_version(sys.modules["tensorflow"])
|
|
exec("del tensorflow")
|
|
except (AttributeError, ImportError):
|
|
pass
|
|
env_setup['sys_platform'] = sys.platform
|
|
return env_setup
|
|
|
|
|
|
def trace_tf_model_if_needed(input_model, placeholder_shapes, placeholder_data_types, example_input):
|
|
import tensorflow as tf
|
|
if not isinstance(input_model,
|
|
(tf.keras.layers.Layer, tf.Module, tf.keras.Model, tf.types.experimental.GenericFunction)):
|
|
return input_model
|
|
return trace_tf_model(input_model, placeholder_shapes, placeholder_data_types, example_input)
|
|
|
|
|
|
def partial_shape_to_list(partial_shape: PartialShape):
|
|
if partial_shape.rank.is_dynamic:
|
|
return None
|
|
res_list = []
|
|
for dim in partial_shape:
|
|
if dim.is_static:
|
|
res_list.append(dim.get_length())
|
|
else:
|
|
res_list.append(None)
|
|
return res_list
|
|
|
|
|
|
def get_input_spec_from_model(model, input_shapes=None):
|
|
import tensorflow as tf
|
|
if hasattr(model, "_build_input_shape") and model._build_input_shape is not None:
|
|
if isinstance(model._build_input_shape, list):
|
|
input_spec = [[tf.TensorSpec(shape) for shape in model._build_input_shape]]
|
|
else:
|
|
input_spec = [tf.TensorSpec(model._build_input_shape)]
|
|
elif input_shapes and isinstance(input_shapes, list) and len(input_shapes) > 0:
|
|
input_spec = []
|
|
for input_shape in input_shapes:
|
|
if isinstance(input_shape, PartialShape):
|
|
input_spec.append(tf.TensorSpec(partial_shape_to_list(input_shape)))
|
|
else:
|
|
input_spec.append(tf.TensorSpec(None))
|
|
else:
|
|
input_spec = [tf.TensorSpec(None)]
|
|
return input_spec
|
|
|
|
|
|
def get_concrete_func(tf_function, example_input, input_needs_packing, error_message, use_example_input=True):
|
|
"""
|
|
Runs tracing of TF function and returns a concrete function.
|
|
|
|
:param tf_function: TF function that needs to be traced.
|
|
:param example_input: Example of function input.
|
|
:param input_needs_packing: determines if input needs to be packed in a list before passing to TF function.
|
|
It is used when original function was wrapped in outer TF function, which changes function signature.
|
|
In this case wrapper TF function always expects list of inputs which are unpacked inside subfunction.
|
|
So list/tuple are treated as multiple inputs of original model.
|
|
Non list/tuple are treated as single input, and it needs packing to a list,
|
|
as wrapper function always expect list of inputs.
|
|
:param error_message: Error message which should be shown in case of tracing error.
|
|
:param use_example_input: Determines if example_input should be used.
|
|
|
|
:returns: Object of type tf.types.experimental.ConcreteFunction.
|
|
"""
|
|
if input_needs_packing and not isinstance(example_input, (list, tuple)):
|
|
example_input = [example_input]
|
|
try:
|
|
if use_example_input:
|
|
if not input_needs_packing and isinstance(example_input, (list, tuple)):
|
|
concrete_func = tf_function.get_concrete_function(*example_input)
|
|
else:
|
|
concrete_func = tf_function.get_concrete_function(example_input)
|
|
|
|
else:
|
|
concrete_func = tf_function.get_concrete_function()
|
|
except Exception as e:
|
|
raise Exception(error_message.format(e))
|
|
return concrete_func
|
|
|
|
|
|
def get_signature_from_input(keras_model):
|
|
import tensorflow as tf
|
|
if not hasattr(keras_model, 'input') or getattr(keras_model, 'input') is None:
|
|
return None
|
|
keras_input_signature = getattr(keras_model, 'input')
|
|
# align input names for the signature
|
|
if hasattr(keras_model, 'input_spec') and getattr(keras_model, 'input_spec') is not None:
|
|
input_spec = getattr(keras_model, 'input_spec')
|
|
input_names = [spec.name for spec in input_spec]
|
|
if not isinstance(keras_input_signature, (dict, list)):
|
|
# scalar case
|
|
keras_input_signature = [keras_input_signature]
|
|
if len(keras_input_signature) == len(input_names) and isinstance(keras_input_signature, list):
|
|
new_keras_input_signature = []
|
|
for idx, elem in enumerate(keras_input_signature):
|
|
new_keras_input_signature.append(tf.TensorSpec(
|
|
shape=elem.shape,
|
|
dtype=elem.dtype,
|
|
name=input_names[idx]))
|
|
return new_keras_input_signature
|
|
return keras_input_signature
|
|
|
|
|
|
def get_signature_from_input_signature(keras_model):
|
|
if not hasattr(keras_model, 'input_signature') or getattr(keras_model, 'input_signature') is None:
|
|
return None
|
|
return getattr(keras_model, 'input_signature')
|
|
|
|
|
|
def create_generic_function_from_keras_model(keras_model):
|
|
import tensorflow as tf
|
|
assert isinstance(keras_model, (tf.keras.Model, tf.Module)), \
|
|
"[TensorFlow Frontend] internal error: the input model must be of tf.keras.Model or tf.Module model type"
|
|
|
|
keras_input_signature = get_signature_from_input(keras_model)
|
|
if keras_input_signature is None:
|
|
keras_input_signature = get_signature_from_input_signature(keras_model)
|
|
if keras_input_signature is None:
|
|
return None
|
|
tf_input_signature = None
|
|
wrapper_function = None
|
|
if isinstance(keras_input_signature, dict):
|
|
tf_input_signature = []
|
|
for tensor_name, tensor_spec in keras_input_signature.items():
|
|
tf_input_signature.append(tf.TensorSpec(shape=tensor_spec.shape,
|
|
dtype=tensor_spec.dtype,
|
|
name=tensor_name))
|
|
elif isinstance(keras_input_signature, list):
|
|
tf_input_signature = []
|
|
for tensor_spec in keras_input_signature:
|
|
tf_input_signature.append(tf.TensorSpec(shape=tensor_spec.shape,
|
|
dtype=tensor_spec.dtype,
|
|
name=tensor_spec.name))
|
|
else:
|
|
try:
|
|
# single KerasTensor case
|
|
tf_input_signature = []
|
|
tf_input_signature.append(tf.TensorSpec(shape=keras_input_signature.shape,
|
|
dtype=keras_input_signature.dtype,
|
|
name=keras_input_signature.name))
|
|
except:
|
|
tf_input_signature = None
|
|
if tf_input_signature is not None:
|
|
@tf.function(input_signature=tf_input_signature)
|
|
def wrapper_function_dict(*args):
|
|
if isinstance(keras_input_signature, list):
|
|
outputs = keras_model(args)
|
|
else:
|
|
input_dict = {}
|
|
for ind, tensor_spec in enumerate(tf_input_signature):
|
|
input_dict[tensor_spec.name] = args[ind]
|
|
outputs = keras_model(input_dict)
|
|
# need to wrap the output into dictionary
|
|
# it helps to preserve original keras tensor names
|
|
post_outputs = {}
|
|
if isinstance(outputs, dict):
|
|
for output_name, output_value in outputs.items():
|
|
post_outputs[output_name] = output_value
|
|
else:
|
|
try:
|
|
if isinstance(outputs, list) and isinstance(keras_model.outputs, list) and \
|
|
len(outputs) == len(keras_model.outputs):
|
|
for output_value, output_tensor in zip(outputs, keras_model.outputs):
|
|
post_outputs[output_tensor.name] = output_value
|
|
else:
|
|
post_outputs[keras_model.output.name] = outputs
|
|
except:
|
|
post_outputs = outputs
|
|
return post_outputs
|
|
|
|
wrapper_function = wrapper_function_dict
|
|
return wrapper_function
|
|
|
|
|
|
def trace_tf_model(model, input_shapes, input_types, example_input):
|
|
import tensorflow as tf
|
|
if isinstance(model.__call__, tf.types.experimental.GenericFunction):
|
|
tf_function = model.__call__
|
|
input_needs_packing = False
|
|
elif isinstance(model, tf.types.experimental.GenericFunction):
|
|
tf_function = model
|
|
input_needs_packing = False
|
|
elif isinstance(model, (tf.keras.Model, tf.Module)):
|
|
tf_function = create_generic_function_from_keras_model(model)
|
|
if tf_function is not None:
|
|
input_needs_packing = False
|
|
else:
|
|
# Wrap model to tf.Function.
|
|
# In this case we loose input/output tensor names.
|
|
@tf.function
|
|
def tf_function(args):
|
|
return model(*args)
|
|
|
|
input_needs_packing = True
|
|
else:
|
|
# Wrap model to tf.Function.
|
|
# In this case we loose input/output tensor names.
|
|
@tf.function
|
|
def tf_function(args):
|
|
return model(*args)
|
|
|
|
input_needs_packing = True
|
|
|
|
def are_shapes_defined(shape: Union[list, dict]):
|
|
if shape is None:
|
|
return False
|
|
assert hasattr(shape, '__len__')
|
|
if len(shape) == 0:
|
|
return False
|
|
|
|
if isinstance(shape, list):
|
|
return np.all([shape is not None for shape in input_shapes])
|
|
elif isinstance(shape, dict):
|
|
return np.all([shape is not None for name, shape in input_shapes.items()])
|
|
|
|
if example_input is not None:
|
|
concrete_func = get_concrete_func(tf_function, example_input, input_needs_packing,
|
|
"Could not trace the TF model with the following error: {}")
|
|
else:
|
|
if isinstance(tf_function, tf.types.experimental.GenericFunction) and \
|
|
tf_function.input_signature is not None:
|
|
concrete_func = get_concrete_func(tf_function, None, input_needs_packing,
|
|
"Could not trace the TF model with the following error: {}",
|
|
use_example_input=False)
|
|
else:
|
|
input_spec = get_input_spec_from_model(model, input_shapes)
|
|
concrete_func = get_concrete_func(tf_function, input_spec, input_needs_packing,
|
|
"Could not trace the TF model with the following error: {}.\n"
|
|
"Please provide 'example_input'.")
|
|
|
|
return concrete_func
|
|
|
|
|
|
def type_supported_by_tf_fe(input_model):
|
|
import tensorflow as tf
|
|
# Types that require tracing
|
|
if isinstance(input_model,
|
|
(tf.keras.layers.Layer, tf.Module, tf.keras.Model, tf.types.experimental.GenericFunction)):
|
|
return True
|
|
# Types that do not require tracing
|
|
if isinstance(input_model, (tf.Graph, tf.types.experimental.ConcreteFunction)):
|
|
return True
|
|
# GraphIterator
|
|
elif model_is_graph_iterator(input_model):
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_variable(func_input, captures):
|
|
import tensorflow as tf
|
|
if func_input.dtype == tf.resource:
|
|
return True
|
|
for capture in captures:
|
|
if id(func_input) == id(capture[1]):
|
|
return True
|
|
return False
|
|
|
|
|
|
def create_tf_graph_iterator(input_model, placeholder_shapes, placeholder_data_types, example_input, share_weights):
|
|
input_model = trace_tf_model_if_needed(input_model, placeholder_shapes, placeholder_data_types, example_input)
|
|
|
|
import tensorflow as tf
|
|
from openvino.frontend.tensorflow.graph_iterator import GraphIteratorTFGraph
|
|
if model_is_graph_iterator(input_model):
|
|
return input_model
|
|
if isinstance(input_model, tf.Graph):
|
|
return GraphIteratorTFGraph(input_model, share_weights)
|
|
elif isinstance(input_model, tf.types.experimental.ConcreteFunction):
|
|
# create a map for inputs to map internal tensor name to external one
|
|
# collect all internal tensor names in a given order
|
|
input_names_map = None
|
|
if hasattr(input_model, 'inputs') and hasattr(input_model, 'structured_input_signature'):
|
|
internal_tensor_names = []
|
|
for func_input in input_model.inputs:
|
|
if is_variable(func_input, input_model.graph.captures):
|
|
continue
|
|
internal_tensor_names.append(func_input.name)
|
|
if len(input_model.structured_input_signature) > 0 and \
|
|
len(internal_tensor_names) == len(input_model.structured_input_signature[0]):
|
|
for internal_name, tensor_spec in zip(internal_tensor_names, input_model.structured_input_signature[0]):
|
|
input_names_map = input_names_map or {}
|
|
if not isinstance(tensor_spec, tf.TensorSpec):
|
|
input_names_map = None
|
|
break
|
|
input_names_map[internal_name] = tensor_spec.name
|
|
elif len(input_model.structured_input_signature) > 1 and \
|
|
len(internal_tensor_names) == len(input_model.structured_input_signature[1]):
|
|
external_tensor_names = sorted(input_model.structured_input_signature[1].keys())
|
|
for internal_name, external_name in zip(internal_tensor_names, external_tensor_names):
|
|
input_names_map = input_names_map or {}
|
|
input_names_map[internal_name] = external_name
|
|
|
|
output_names_map = None
|
|
if hasattr(input_model, 'outputs') and hasattr(input_model, 'structured_outputs') and \
|
|
isinstance(input_model.structured_outputs, dict):
|
|
external_names = sorted(list(input_model.structured_outputs.keys()))
|
|
internal_names = [tensor.name for tensor in input_model.outputs]
|
|
if len(external_names) == len(internal_names):
|
|
for external_name, internal_name in zip(external_names, internal_names):
|
|
output_names_map = output_names_map or {}
|
|
output_names_map[internal_name] = external_name
|
|
else:
|
|
for external_name, internal_tensor in input_model.structured_outputs.items():
|
|
internal_tf_tensor = None
|
|
if isinstance(internal_tensor, tf.Tensor):
|
|
internal_tf_tensor = internal_tensor
|
|
if isinstance(internal_tensor, list) and len(internal_tensor) > 0 and \
|
|
isinstance(internal_tensor[0], tf.Tensor):
|
|
internal_tf_tensor = internal_tensor[0]
|
|
if internal_tf_tensor is None:
|
|
output_names_map = None
|
|
break
|
|
output_names_map = output_names_map or {}
|
|
output_names_map[internal_tf_tensor.name] = external_name
|
|
return GraphIteratorTFGraph(input_model.graph, share_weights, False, input_names_map, output_names_map)
|
|
raise Exception("Could not wrap model of type {} to GraphIteratorTFGraph.".format(type(input_model)))
|
|
|
|
|
|
def extract_model_graph(argv):
|
|
model = argv["input_model"]
|
|
import tensorflow as tf
|
|
trackable_is_imported = False
|
|
try:
|
|
from tensorflow.python.training.tracking.base import Trackable # pylint: disable=no-name-in-module,import-error
|
|
trackable_is_imported = True
|
|
except:
|
|
try:
|
|
from tensorflow.python.trackable.base import Trackable
|
|
trackable_is_imported = True
|
|
except:
|
|
log.warning("Could not import tensorflow.python.training.tracking.base.Trackable type.")
|
|
env_setup = get_environment_setup("tf")
|
|
if isinstance(model, tf.Graph):
|
|
return True
|
|
if isinstance(model, tf.compat.v1.GraphDef):
|
|
graph = tf.Graph()
|
|
with graph.as_default():
|
|
tf.graph_util.import_graph_def(model, name='')
|
|
argv["input_model"] = graph
|
|
return True
|
|
if isinstance(model, tf.compat.v1.Session):
|
|
argv["input_model"] = model.graph
|
|
return True
|
|
if Version(env_setup["tensorflow"]) >= parse("2.6.0") and isinstance(model, (tf.types.experimental.GenericFunction,
|
|
tf.types.experimental.ConcreteFunction)):
|
|
return True
|
|
if isinstance(model, tf.train.Checkpoint):
|
|
if isinstance(model.root, tf.keras.Model):
|
|
argv["input_model"] = model.root
|
|
return True
|
|
else:
|
|
raise Exception("Unknown checkpoint format.")
|
|
|
|
if isinstance(model, (tf.keras.layers.Layer, tf.Module, tf.keras.Model)):
|
|
return True
|
|
if trackable_is_imported and isinstance(model, Trackable):
|
|
if hasattr(model, "signatures") and len(model.signatures.items()):
|
|
if "serving_default" in model.signatures:
|
|
argv["input_model"] = model.signatures["serving_default"]
|
|
elif "default" in model.signatures:
|
|
argv["input_model"] = model.signatures["default"]
|
|
else:
|
|
for signature_name, signature in model.signatures.items():
|
|
argv["input_model"] = model.signatures[signature_name]
|
|
log.warning("Could not find the default signature. "
|
|
"The following signature was used for conversion: {}".format(signature_name))
|
|
break
|
|
|
|
elif hasattr(model, "graph"):
|
|
argv["input_model"] = model.graph
|
|
else:
|
|
raise Exception("Could not find signature of graph in a Trackable object.")
|
|
return True
|
|
if model_is_graph_iterator(model):
|
|
return True
|
|
return False
|
|
|
|
|
|
def model_is_graph_iterator(model):
|
|
try:
|
|
from openvino.frontend.tensorflow.graph_iterator import GraphIteratorTFGraph
|
|
except:
|
|
return False
|
|
return isinstance(model, GraphIteratorTFGraph)
|
|
|
|
|
|
def tf_type_to_ov_type(val):
|
|
import tensorflow as tf # pylint: disable=import-error
|
|
if not isinstance(val, tf.dtypes.DType):
|
|
raise Exception("The provided type is not a TF type {}.".format(val))
|
|
|
|
tf_to_ov_type = {
|
|
tf.float32: Type.f32,
|
|
tf.float16: Type.f16,
|
|
tf.float64: Type.f64,
|
|
tf.bfloat16: Type.bf16,
|
|
tf.uint8: Type.u8,
|
|
tf.int8: Type.i8,
|
|
tf.int16: Type.i16,
|
|
tf.int32: Type.i32,
|
|
tf.int64: Type.i64,
|
|
tf.bool: Type.boolean,
|
|
tf.string: Type.string
|
|
}
|
|
if val not in tf_to_ov_type:
|
|
raise Exception("The provided data type is not supported by OpenVino {}.".format(val))
|
|
return tf_to_ov_type[val]
|