Files

221 lines
10 KiB
Python

# Copyright (C) 2018-2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import argparse
import logging as log
from openvino.preprocess import PrePostProcessor # pylint: disable=no-name-in-module,import-error
# pylint: disable=no-name-in-module,import-error
from openvino import Model, Layout, PartialShape
from openvino.tools.ovc.error import Error
from openvino.tools.ovc.moc_frontend.layout_utils import update_layout_to_dict
from openvino.tools.ovc.utils import refer_to_faq_msg
def check_keys_valid(ov_function: Model, dict_to_validate: dict, search_outputs: bool):
"""
Internal function: checks if keys from cmd line arguments correspond to ov_function's inputs/outputs
Throws if some key is not found
Throws if some different keys point to the same actual input/output
"""
nodes_used = {}
nodes = ov_function.inputs
if search_outputs:
nodes += ov_function.outputs
# We need to replace all node names from dict to tensor names
rename_dict = {}
# Find names for replacing
for name in dict_to_validate.keys():
for ov_node in nodes:
if name in ov_node.get_tensor().get_names():
break
elif name == ov_node.get_node().get_friendly_name():
assert len(ov_node.get_tensor().get_names()) > 0, 'Node must have at least one tensor name'
new_name = list(ov_node.get_tensor().get_names())[0]
rename_dict[name] = new_name
break
# Replace found node names with tensor names
for name, new_name in rename_dict.items():
assert name in dict_to_validate, 'Key {} is not in initial dict'.format(name)
assert new_name not in dict_to_validate, 'Key {} is already in initial dict'.format(new_name)
dict_to_validate[new_name] = dict_to_validate[name]
del dict_to_validate[name]
# validate the dict
for name in dict_to_validate.keys():
node_found = False
for ov_node in nodes:
if name in ov_node.get_tensor().get_names():
if ov_node in nodes_used:
raise Error('Key for {} and {} point to same model input/output.'
.format(name, nodes_used[ov_node]))
nodes_used[ov_node] = name
node_found = True
break
if not node_found:
if not search_outputs:
raise Error('Input with name {} wasn\'t found! {}'.format(name, refer_to_faq_msg(83)))
else:
raise Error('Input/Output with name {} wasn\'t found! {}'.format(name, refer_to_faq_msg(83)))
def update_layout_is_input_flag(ov_function: Model, layout_values: dict):
"""
Internal function: updates layout_values with flag whether each layout belongs to input or to output
"""
for name, layout_value in layout_values.items():
layout_value['is_input'] = False
for ov_input in ov_function.inputs:
if name in ov_input.get_tensor().get_names():
layout_value['is_input'] = True
break
return layout_values
def find_channels_dimension(shape: PartialShape, num_channels: int, name: str, layout_values):
"""
Internal function. Finds dimension index matching with expected channels number
Raises error if there is no candidates or number of candidates is > 1
:param: shape Parameter's partial shape
:param: num_channels Number of channels to find in shape
:param: name Parameter's name, used for Error-handling purposes
:param: layout_values Existing source/target layout items specified by user
:return: updated layout items with guessed layouts
"""
if shape.rank.is_dynamic:
raise Error('Can\'t determine channels dimension for dynamic shape for parameter {}.'
.format(name))
dim_idx_found = -1
for dim_idx in range(shape.rank.get_length()):
dim = shape.get_dimension(dim_idx)
if dim.is_static and dim.get_length() == num_channels:
if dim_idx_found >= 0:
raise Error('Can\'t determine channels dimension for {}. '
'Input shape is {}, needed channels {}. '
'Conflicting dimensions: {} and {}. Please specify layout manually.'
.format(name, shape, num_channels, dim_idx_found, dim_idx))
dim_idx_found = dim_idx
if dim_idx_found < 0:
raise Error('Can\'t determine channels dimension for {}. '
'Input shape is {}, needed channels {}'
.format(name, shape, num_channels))
# Restrict guessed channels index to particular position depending on tensor shape(3d, 4d, 5d)
if shape.rank.get_length() == 3:
# CHW or HWC, possible channels index is 0 or 2
if dim_idx_found != 0 and dim_idx_found != 2:
raise Error('Can\'t determine channels dimension for 3D input {} (CHW or HWC) with shape {}. '
'Please specify layout containing \'C\' channels manually.'.format(name, shape))
elif shape.rank.get_length() == 4:
# NCHW or NHWC, possible channels index is 1 or 3
if dim_idx_found != 1 and dim_idx_found != 3:
raise Error('Can\'t determine channels dimension for 4D input {} (NCHW or NHWC) with shape {}. '
'Please specify layout containing \'C\' channels manually.'.format(name, shape))
elif shape.rank.get_length() == 5:
# NCDHW or NDHWC, possible channels index is 1 or 4
if dim_idx_found != 1 and dim_idx_found != 4:
raise Error('Can\'t determine channels dimension for 5D input {} (NCDHW or NDHWC) with shape {}. '
'Please specify layout containing \'C\' channels manually.'.format(name, shape))
else:
raise Error('Can\'t determine channels dimension for {}D input {} with shape {}.'
'Please specify layout containing \'C\' channels manually.'
.format(shape.rank.get_length(), name, shape))
layout_str = "?" * shape.rank.get_length()
layout_str = layout_str[:dim_idx_found] + 'C' + layout_str[dim_idx_found + 1:]
layout_values[name] = {
'source_layout': layout_str,
'target_layout': None,
'source_guessed': True,
'is_input': True
}
return layout_values
def update_tensor_names_to_first_in_sorted_list(values_dict: dict, ov_function: Model):
if not isinstance(values_dict, dict):
return values_dict
updated_dict = {}
used_nodes = {}
for name, value in values_dict.items():
input_found = False
for input in ov_function.inputs:
tensor_names = list(input.names)
tensor_names.sort()
if not (name in tensor_names or name == input.node.get_friendly_name()):
continue
if input in used_nodes:
raise Error("Tensor names {} and {} refer to the same node.".format(name, used_nodes[input]))
used_nodes.update({input: name})
updated_dict[tensor_names[0]] = value
input_found = True
break
if not input_found:
raise Error('Input with name {} wasn\'t found! {}'.format(name, refer_to_faq_msg(83)))
return updated_dict
def apply_preprocessing(ov_function: Model, argv: argparse.Namespace):
"""
Applies pre-processing of model inputs by adding appropriate operations
On return, 'ov_function' object will be updated
Expected 'argv.mean_scale_values' formats examples:
a) Dict: {'inputName': {'mean': [1., 2., 3.], 'scale': [2., 4., 8.]}}
b) List: list(np.array([(np.array([1., 2., 3.]), np.array([2., 4., 6.])),
(np.array([7., 8., 9.]), np.array([5., 6., 7.])))
Expected 'argv.layout_values' format examples:
a) Specific layouts for inputs and outputs
{ 'input1': {
'source_layout': 'nchw',
'target_layout': 'nhwc'
},
'output2': {
'source_layout': 'nhwc'
}
}
b) Layout for single input: {'': {'source_layout': 'nchw'}}
:param: ov_function OV function for applying mean/scale pre-processing
:param: argv Parsed command line arguments
"""
prep = PrePostProcessor(ov_function)
layout_values = {}
if 'layout_values' in argv and argv.layout_values:
layout_values = update_layout_to_dict(ov_function.inputs, argv.layout_values,
lambda ov_input: ov_input.get_tensor().get_names())
check_keys_valid(ov_function=ov_function, dict_to_validate=layout_values, search_outputs=True)
layout_values = update_layout_is_input_flag(ov_function, layout_values)
for node_name, layout_value in layout_values.items():
if layout_value.get('source_layout'):
if layout_value.get('is_input'):
prep.input(node_name).model().set_layout(Layout(layout_value['source_layout']))
else:
prep.output(node_name).model().set_layout(Layout(layout_value['source_layout']))
if layout_value.get('target_layout'):
if layout_value.get('is_input'):
prep.input(node_name).tensor().set_layout(Layout(layout_value['target_layout']))
else:
prep.output(node_name).tensor().set_layout(Layout(layout_value['target_layout']))
# Apply pre-processing builder to a function
ov_function = prep.build()
# Remove guessed layout values from ov_function (these values shall not be serialized to IR
for node_name, layout_value in layout_values.items():
if layout_value.get('source_guessed') and \
not layout_value.get('target_layout'):
# search for parameter object
for idx, ov_input in enumerate(ov_function.inputs):
if node_name in ov_input.get_tensor().get_names():
log.debug('Clearing guessed layout {} for {}'
.format(layout_value['source_layout'], node_name))
ov_function.get_parameters()[idx].layout = Layout()