221 lines
10 KiB
Python
221 lines
10 KiB
Python
# Copyright (C) 2018-2025 Intel Corporation
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
import argparse
|
|
import logging as log
|
|
|
|
from openvino.preprocess import PrePostProcessor # pylint: disable=no-name-in-module,import-error
|
|
# pylint: disable=no-name-in-module,import-error
|
|
from openvino import Model, Layout, PartialShape
|
|
from openvino.tools.ovc.error import Error
|
|
from openvino.tools.ovc.moc_frontend.layout_utils import update_layout_to_dict
|
|
from openvino.tools.ovc.utils import refer_to_faq_msg
|
|
|
|
|
|
def check_keys_valid(ov_function: Model, dict_to_validate: dict, search_outputs: bool):
|
|
"""
|
|
Internal function: checks if keys from cmd line arguments correspond to ov_function's inputs/outputs
|
|
Throws if some key is not found
|
|
Throws if some different keys point to the same actual input/output
|
|
"""
|
|
nodes_used = {}
|
|
nodes = ov_function.inputs
|
|
if search_outputs:
|
|
nodes += ov_function.outputs
|
|
|
|
# We need to replace all node names from dict to tensor names
|
|
rename_dict = {}
|
|
# Find names for replacing
|
|
for name in dict_to_validate.keys():
|
|
for ov_node in nodes:
|
|
if name in ov_node.get_tensor().get_names():
|
|
break
|
|
elif name == ov_node.get_node().get_friendly_name():
|
|
assert len(ov_node.get_tensor().get_names()) > 0, 'Node must have at least one tensor name'
|
|
new_name = list(ov_node.get_tensor().get_names())[0]
|
|
rename_dict[name] = new_name
|
|
break
|
|
|
|
# Replace found node names with tensor names
|
|
for name, new_name in rename_dict.items():
|
|
assert name in dict_to_validate, 'Key {} is not in initial dict'.format(name)
|
|
assert new_name not in dict_to_validate, 'Key {} is already in initial dict'.format(new_name)
|
|
dict_to_validate[new_name] = dict_to_validate[name]
|
|
del dict_to_validate[name]
|
|
|
|
# validate the dict
|
|
for name in dict_to_validate.keys():
|
|
node_found = False
|
|
for ov_node in nodes:
|
|
if name in ov_node.get_tensor().get_names():
|
|
if ov_node in nodes_used:
|
|
raise Error('Key for {} and {} point to same model input/output.'
|
|
.format(name, nodes_used[ov_node]))
|
|
nodes_used[ov_node] = name
|
|
node_found = True
|
|
break
|
|
|
|
if not node_found:
|
|
if not search_outputs:
|
|
raise Error('Input with name {} wasn\'t found! {}'.format(name, refer_to_faq_msg(83)))
|
|
else:
|
|
raise Error('Input/Output with name {} wasn\'t found! {}'.format(name, refer_to_faq_msg(83)))
|
|
|
|
|
|
def update_layout_is_input_flag(ov_function: Model, layout_values: dict):
|
|
"""
|
|
Internal function: updates layout_values with flag whether each layout belongs to input or to output
|
|
"""
|
|
for name, layout_value in layout_values.items():
|
|
layout_value['is_input'] = False
|
|
for ov_input in ov_function.inputs:
|
|
if name in ov_input.get_tensor().get_names():
|
|
layout_value['is_input'] = True
|
|
break
|
|
return layout_values
|
|
|
|
|
|
def find_channels_dimension(shape: PartialShape, num_channels: int, name: str, layout_values):
|
|
"""
|
|
Internal function. Finds dimension index matching with expected channels number
|
|
Raises error if there is no candidates or number of candidates is > 1
|
|
:param: shape Parameter's partial shape
|
|
:param: num_channels Number of channels to find in shape
|
|
:param: name Parameter's name, used for Error-handling purposes
|
|
:param: layout_values Existing source/target layout items specified by user
|
|
:return: updated layout items with guessed layouts
|
|
"""
|
|
if shape.rank.is_dynamic:
|
|
raise Error('Can\'t determine channels dimension for dynamic shape for parameter {}.'
|
|
.format(name))
|
|
|
|
dim_idx_found = -1
|
|
for dim_idx in range(shape.rank.get_length()):
|
|
dim = shape.get_dimension(dim_idx)
|
|
if dim.is_static and dim.get_length() == num_channels:
|
|
if dim_idx_found >= 0:
|
|
raise Error('Can\'t determine channels dimension for {}. '
|
|
'Input shape is {}, needed channels {}. '
|
|
'Conflicting dimensions: {} and {}. Please specify layout manually.'
|
|
.format(name, shape, num_channels, dim_idx_found, dim_idx))
|
|
dim_idx_found = dim_idx
|
|
if dim_idx_found < 0:
|
|
raise Error('Can\'t determine channels dimension for {}. '
|
|
'Input shape is {}, needed channels {}'
|
|
.format(name, shape, num_channels))
|
|
|
|
# Restrict guessed channels index to particular position depending on tensor shape(3d, 4d, 5d)
|
|
if shape.rank.get_length() == 3:
|
|
# CHW or HWC, possible channels index is 0 or 2
|
|
if dim_idx_found != 0 and dim_idx_found != 2:
|
|
raise Error('Can\'t determine channels dimension for 3D input {} (CHW or HWC) with shape {}. '
|
|
'Please specify layout containing \'C\' channels manually.'.format(name, shape))
|
|
elif shape.rank.get_length() == 4:
|
|
# NCHW or NHWC, possible channels index is 1 or 3
|
|
if dim_idx_found != 1 and dim_idx_found != 3:
|
|
raise Error('Can\'t determine channels dimension for 4D input {} (NCHW or NHWC) with shape {}. '
|
|
'Please specify layout containing \'C\' channels manually.'.format(name, shape))
|
|
elif shape.rank.get_length() == 5:
|
|
# NCDHW or NDHWC, possible channels index is 1 or 4
|
|
if dim_idx_found != 1 and dim_idx_found != 4:
|
|
raise Error('Can\'t determine channels dimension for 5D input {} (NCDHW or NDHWC) with shape {}. '
|
|
'Please specify layout containing \'C\' channels manually.'.format(name, shape))
|
|
else:
|
|
raise Error('Can\'t determine channels dimension for {}D input {} with shape {}.'
|
|
'Please specify layout containing \'C\' channels manually.'
|
|
.format(shape.rank.get_length(), name, shape))
|
|
|
|
layout_str = "?" * shape.rank.get_length()
|
|
layout_str = layout_str[:dim_idx_found] + 'C' + layout_str[dim_idx_found + 1:]
|
|
layout_values[name] = {
|
|
'source_layout': layout_str,
|
|
'target_layout': None,
|
|
'source_guessed': True,
|
|
'is_input': True
|
|
}
|
|
return layout_values
|
|
|
|
|
|
def update_tensor_names_to_first_in_sorted_list(values_dict: dict, ov_function: Model):
|
|
if not isinstance(values_dict, dict):
|
|
return values_dict
|
|
updated_dict = {}
|
|
used_nodes = {}
|
|
for name, value in values_dict.items():
|
|
input_found = False
|
|
for input in ov_function.inputs:
|
|
tensor_names = list(input.names)
|
|
tensor_names.sort()
|
|
if not (name in tensor_names or name == input.node.get_friendly_name()):
|
|
continue
|
|
if input in used_nodes:
|
|
raise Error("Tensor names {} and {} refer to the same node.".format(name, used_nodes[input]))
|
|
used_nodes.update({input: name})
|
|
updated_dict[tensor_names[0]] = value
|
|
input_found = True
|
|
break
|
|
if not input_found:
|
|
raise Error('Input with name {} wasn\'t found! {}'.format(name, refer_to_faq_msg(83)))
|
|
|
|
return updated_dict
|
|
|
|
|
|
def apply_preprocessing(ov_function: Model, argv: argparse.Namespace):
|
|
"""
|
|
Applies pre-processing of model inputs by adding appropriate operations
|
|
On return, 'ov_function' object will be updated
|
|
Expected 'argv.mean_scale_values' formats examples:
|
|
a) Dict: {'inputName': {'mean': [1., 2., 3.], 'scale': [2., 4., 8.]}}
|
|
b) List: list(np.array([(np.array([1., 2., 3.]), np.array([2., 4., 6.])),
|
|
(np.array([7., 8., 9.]), np.array([5., 6., 7.])))
|
|
Expected 'argv.layout_values' format examples:
|
|
a) Specific layouts for inputs and outputs
|
|
{ 'input1': {
|
|
'source_layout': 'nchw',
|
|
'target_layout': 'nhwc'
|
|
},
|
|
'output2': {
|
|
'source_layout': 'nhwc'
|
|
}
|
|
}
|
|
b) Layout for single input: {'': {'source_layout': 'nchw'}}
|
|
:param: ov_function OV function for applying mean/scale pre-processing
|
|
:param: argv Parsed command line arguments
|
|
"""
|
|
prep = PrePostProcessor(ov_function)
|
|
|
|
layout_values = {}
|
|
if 'layout_values' in argv and argv.layout_values:
|
|
layout_values = update_layout_to_dict(ov_function.inputs, argv.layout_values,
|
|
lambda ov_input: ov_input.get_tensor().get_names())
|
|
|
|
check_keys_valid(ov_function=ov_function, dict_to_validate=layout_values, search_outputs=True)
|
|
|
|
layout_values = update_layout_is_input_flag(ov_function, layout_values)
|
|
|
|
for node_name, layout_value in layout_values.items():
|
|
if layout_value.get('source_layout'):
|
|
if layout_value.get('is_input'):
|
|
prep.input(node_name).model().set_layout(Layout(layout_value['source_layout']))
|
|
else:
|
|
prep.output(node_name).model().set_layout(Layout(layout_value['source_layout']))
|
|
if layout_value.get('target_layout'):
|
|
if layout_value.get('is_input'):
|
|
prep.input(node_name).tensor().set_layout(Layout(layout_value['target_layout']))
|
|
else:
|
|
prep.output(node_name).tensor().set_layout(Layout(layout_value['target_layout']))
|
|
|
|
# Apply pre-processing builder to a function
|
|
ov_function = prep.build()
|
|
|
|
# Remove guessed layout values from ov_function (these values shall not be serialized to IR
|
|
for node_name, layout_value in layout_values.items():
|
|
if layout_value.get('source_guessed') and \
|
|
not layout_value.get('target_layout'):
|
|
# search for parameter object
|
|
for idx, ov_input in enumerate(ov_function.inputs):
|
|
if node_name in ov_input.get_tensor().get_names():
|
|
log.debug('Clearing guessed layout {} for {}'
|
|
.format(layout_value['source_layout'], node_name))
|
|
ov_function.get_parameters()[idx].layout = Layout()
|