Files

448 lines
18 KiB
Python
Raw Permalink Normal View History

# Copyright (C) 2018-2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import re
from enum import Enum
import numpy as np
from openvino._pyopenvino import Place, PartialShape # pylint: disable=no-name-in-module,import-error
from openvino.frontend import InputModel # pylint: disable=no-name-in-module,import-error
from openvino.tools.ovc.error import Error
def raise_no_node(node_name: str):
raise Error('No node with name {}'.format(node_name))
def raise_node_name_collision(node_name: str, found_nodes: list):
raise Error('Name collision was found, there are several nodes for mask "{}": {}. '
'If your intention was to specify port for node, please instead specify node names connected to '
'this port. If your intention was to specify the node name, please add port to the node '
'name'.format(node_name, found_nodes))
class IOType(Enum):
Input = 1
Output = 2
def decode_name_with_port(
input_model: InputModel, node_name: str, framework="", io_type=IOType.Input
) -> Place or None:
"""
Decode name with optional port specification w/o traversing all the nodes in the graph
TODO: in future node_name can specify input/output port groups as well as indices (58562)
:param input_model: Input Model
:param node_name: user provided node name
:return: decoded place in the graph
"""
found_places = []
found_place_names = []
def get_place_by_operation_name(input_model, name, framework, io_type):
node = input_model.get_place_by_operation_name(name)
if node and framework == "onnx":
if io_type == IOType.Input:
return (
node.get_input_port(input_port_index=0)
.get_producing_port()
.get_target_tensor()
)
else:
return node.get_output_port(output_port_index=0).get_target_tensor()
return node
# find by tensor name
place = input_model.get_place_by_tensor_name(node_name)
if place:
found_place_names.append("Tensor:" + node_name)
found_places.append(place)
else:
# find by operation name
place = get_place_by_operation_name(input_model, node_name, framework, io_type)
name = node_name
if framework == "onnx" and io_type == IOType.Output:
name = "Tensor:" + name
if place:
found_place_names.append(name)
found_places.append(place)
def try_get_node(model, name, framework):
node = model.get_place_by_operation_name(name)
if node:
return node
if framework == "onnx":
tensor = model.get_place_by_tensor_name(name)
if tensor:
if tensor.is_input() or tensor.is_output():
return tensor
return tensor.get_producing_operation()
return None
def get_port(match, match_starts_with_name, input_model, framework):
if not match:
return None
if match_starts_with_name:
name = match.group(1)
port_index = match.group(2)
else:
name = match.group(2)
port_index = match.group(1)
node = try_get_node(input_model, name, framework)
if node:
# if regular expression has structure <name>:<port>, get node output port.
# Otherwise get node input port
if match_starts_with_name:
return node.get_output_port(output_port_index=int(port_index))
else:
return node.get_input_port(input_port_index=int(port_index))
else:
return None
regexp_post = r"(.+):(\d+)"
match = re.search(regexp_post, node_name)
match_port = get_port(
match=match,
match_starts_with_name=True,
input_model=input_model,
framework=framework,
)
if match_port:
name = match.group(1)
if framework == "onnx":
found_place_names.append("Tensor:" + name)
found_places.append(match_port.get_target_tensor())
else:
found_place_names.append(name)
found_places.append(match_port)
regexp_pre = r"(\d+):(.+)"
match = re.search(regexp_pre, node_name)
match_port = get_port(
match=match,
match_starts_with_name=False,
input_model=input_model,
framework=framework,
)
if match_port:
name = match.group(2)
if framework == "onnx":
found_place_names.append("Tensor:" + name)
found_places.append(match_port.get_producing_port().get_target_tensor())
else:
found_places.append(match_port)
found_place_names.append(name)
if len(found_places) == 0:
raise_no_node(node_name)
# Check that there is no collision, all found places shall point to same data
if not all([n.is_equal_data(found_places[0]) for n in found_places]):
raise_node_name_collision(node_name, found_place_names)
# TODO: Add support for input/output group name and port index here (58562)
# For new frontends logic shall be extended to additionally support input and output group names
return found_places[0]
def fe_input_user_data_repack(
input_model: InputModel,
input_user_shapes: [None, list, dict, np.ndarray],
freeze_placeholder: dict,
framework: str,
input_user_data_types=None,
):
"""
Restructures user input cutting request. Splits ports out of node names.
Transforms node names to node ids.
:param input_model: current input model
:param input_user_shapes: data structure representing user input cutting request. It may be:
# None value if user did not provide neither "input" nor "input_shape" keys
# list instance which contains input layer names with or without ports if user provided
only "input" key
# dict instance which contains input layer names with or without ports as keys and shapes as
values if user provided both "input" and "input_shape"
# np.ndarray if user provided only "input_shape" key
:param freeze_placeholder: dictionary with placeholder names as keys and freezing value as values
:param input_user_data_types: dictionary with input nodes and its data types
:return: restructured input shapes and freeze placeholder shapes information
Example of input dictionary:
_input_shapes =
{
'node_ID':
[
{'shape': None, 'in': 0},
{'shape': None, 'in': 1},
],
'node_1_ID':
[
{'shape': [1, 227, 227, 3], 'port': None, 'data_type': np.int32}
],
'node_2_ID':
[
{'shape': None, 'out': 3}
]
}
Example of freeze placeholder dictionary:
_freeze_placeholder =
{
'phase_train' : False
}
"""
_input_shapes = []
_input_names = []
model_inputs = input_model.get_inputs()
if isinstance(input_user_shapes, list) and len(input_user_shapes) > 1 and isinstance(input_user_shapes[0],
PartialShape):
for shape in input_user_shapes:
assert isinstance(shape, PartialShape), "Got incorrect format of input shapes."
assert len(model_inputs) == len(input_user_shapes)
for idx, model_input in enumerate(model_inputs):
_input_shapes.append({"node": model_input, "shape": input_user_shapes[idx]})
elif isinstance(input_user_shapes, list) or isinstance(input_user_shapes, dict):
for input_name in input_user_shapes:
node = decode_name_with_port(
input_model, input_name, framework, IOType.Input
)
if node is None:
raise Error(
"Cannot find location {} in the input model".format(input_name)
)
shape = (
None
if isinstance(input_user_shapes, list)
else input_user_shapes[input_name]
)
if isinstance(input_user_data_types, dict) and input_user_data_types.get(input_name) is not None:
data_type = input_user_data_types[input_name]
_input_shapes.append(
{
"node": node,
"shape": shape,
"data_type": data_type,
"input_name": input_name,
}
)
else:
_input_shapes.append(
{
"node": node,
"shape": shape,
"input_name": input_name
}
)
_input_names.append(input_name)
elif isinstance(input_user_shapes, PartialShape):
# this branch covers the single use of `input_shape` without `input` option
# but it can be used along with `freeze_placeholder_with_value` option
# for example, input_shape [3] freeze_placeholder_with_value "is_training->False"
# means the model has two inputs: one is is_training to be frozen, the other to re-write the shape
# NOTE: the logic relies on parameters with the single name
frozen_names = freeze_placeholder.keys()
assert len(model_inputs) == len(frozen_names) + 1, \
"Please check the conversion command-line. Total number of model inputs ({} detected) " \
"must match to a number of input shapes along with frozen inputs ({} in total).".format(
len(model_inputs),
len(frozen_names) + 1)
for node in model_inputs:
assert len(node.get_names()) > 0, "Original model inputs must have tensor names."
input_name = node.get_names()[0]
if input_name not in frozen_names:
_input_shapes.append(
{
"node": node,
"shape": input_user_shapes,
"input_name": input_name
}
)
# case when single unnamed input shape and type was specified
if input_name in input_user_data_types:
_input_shapes[-1]['data_type'] = input_user_data_types[input_name]
_input_names.append(input_name)
break
else:
# this case means that we use original inputs of the model
# and they should not be changed and their properties (shape and type) should not be over-written
# NOTE: the logic relies on parameters with the single name
assert input_user_shapes is None
for node in model_inputs:
assert len(node.get_names()) > 0, "Original model inputs must have tensor names."
input_name = node.get_names()[0]
_input_shapes.append(
{
"node": node,
"input_name": input_name
}
)
# case when types were specified for unnamed inputs
if input_name in input_user_data_types:
_input_shapes[-1]['data_type'] = input_user_data_types[input_name]
# mark-up Place names we already put into the _input_names
# to avoid duplicates in updates by freeze_placeholder below
_input_names.append(input_name)
if freeze_placeholder:
# in case freezing via freeze_placeholder_with_value option, _input_shapes can miss some frozen places
for input_name in freeze_placeholder:
if input_name in _input_names:
continue
node = decode_name_with_port(
input_model, input_name, framework, IOType.Input
)
_input_shapes.append(
{
"node": node,
"input_name": input_name
}
)
return _input_shapes, freeze_placeholder
return _input_shapes, dict()
def fe_output_user_data_repack(input_model: InputModel, outputs: list, framework: str):
"""
:param input_model: Input Model to operate on
:param outputs: list of node names provided by user
:return: dictionary with node IDs as keys and list of port dictionaries as values
Example of outputs dictionary:
_outputs =
{
'node_ID':
[
{'out': 0},
{'out': 1},
],
'node_1_ID':
[
{'port': None}
],
'node_2_ID':
[
{'in': 3}
]
}
"""
_outputs = []
if outputs is not None:
for output in outputs:
node = decode_name_with_port(input_model, output, framework, IOType.Output)
if node is None:
raise Error("Cannot find location {} in the graph".format(output))
_outputs.append({"node": node, "output_name": output})
return _outputs
def find_first_unused_input(model_inputs: list, param_dict: dict, param_name: str):
"""
Finds first input in model_inputs, which is not present in freeze_placeholder dictionary or param_dict.
:param model_inputs: list of model inputs
:param param_dict: dictionary where key is input name, value is parameter value (shape or type).
:param param_name: name of parameter used in exception message.
:return: first input name, which is not present in freeze_placeholder dictionary or param_dict.
"""
for inp in model_inputs:
input_names = inp.get_names()
name_found = False
for input_name in input_names:
if input_name in param_dict:
name_found = True
break
if name_found:
continue
return input_names[0]
raise Error("Could not set {}, as model does not have enough inputs.".format(param_name))
def convert_params_lists_to_dicts(input_model,
input_user_shapes: [list, dict],
input_user_data_types: [list, dict]):
"""
Convert lists of unnamed params to dicts using input names from input_model.
:param input_model: openvino.InputModel
:param input_user_shapes: list of input shapes or dictionary where key is input name, value is input shape from user.
:param input_user_data_types: list of input types or dictionary where key is input name, value is input type from user.
:return: (input_user_shapes_dict, input_user_data_types_dict, freeze_placeholder), where
input_user_shapes_dict - dictionary where key is input name, value is shape from user;
input_user_data_types_dict - dictionary where key is input name, value is type from user;
freeze_placeholder - dictionary where key is input name, value is input value from user;
"""
from openvino import PartialShape # pylint: disable=no-name-in-module,import-error
model_inputs = input_model.get_inputs()
input_user_data_types_dict = {}
input_user_shapes_dict = {}
# input_user_shapes is list only if unnamed inputs were used
if isinstance(input_user_shapes, list):
# this cycle adds each unnamed shape to dictionary using name from model_inputs
for idx, shape in enumerate(input_user_shapes):
assert isinstance(shape, PartialShape), "Got incorrect format of input shapes {}.".format(type(shape))
inp_name = find_first_unused_input(model_inputs, input_user_shapes_dict, "shape")
input_user_shapes_dict[inp_name] = shape
else:
input_user_shapes_dict = input_user_shapes
# input_user_data_types is list only if unnamed inputs were used
if isinstance(input_user_data_types, list):
from openvino import Type # pylint: disable=no-name-in-module,import-error
if input_user_shapes_dict is None:
input_user_shapes_dict = {}
# this cycle adds each unnamed type to dictionary using name from model_inputs
for idx, node_type in enumerate(input_user_data_types):
assert isinstance(node_type, (type, np.dtype, Type)), "Got incorrect format of input types. " \
"Expected numpy type or openvino.Type, " \
"got {}.".format(type(node_type))
inp_name = find_first_unused_input(model_inputs, input_user_data_types_dict, "type")
input_user_data_types_dict[inp_name] = node_type
# FE postprocessing expects input_user_shapes_dict to always have shapes for corresponding types.
# If shape is not set it is expected to have None shape in input_user_shapes_dict dictionary.
if inp_name not in input_user_shapes_dict:
input_user_shapes_dict[inp_name] = None
else:
input_user_data_types_dict = input_user_data_types
return input_user_shapes_dict, input_user_data_types_dict
def fe_user_data_repack(
input_model: InputModel,
input_user_shapes: [None, list, dict, np.array],
input_user_data_types: dict,
outputs: list,
freeze_placeholder: dict,
framework: str,
):
"""
:param input_model: Input Model to operate on
:param input_user_shapes: data structure representing user input cutting request
:param input_user_data_types: dictionary with input nodes and its data types
:param outputs: list of node names to treat as outputs
:param freeze_placeholder: dictionary with placeholder names as keys and freezing value as values
:return: restructured input, output and freeze placeholder dictionaries or None values
"""
_input_shapes, _freeze_placeholder = fe_input_user_data_repack(
input_model,
input_user_shapes,
freeze_placeholder,
framework,
input_user_data_types=input_user_data_types,
)
_outputs = fe_output_user_data_repack(input_model, outputs, framework)
return _input_shapes, _outputs, _freeze_placeholder