#ifndef ANSCUSTOMPY_H #define ANSCUSTOMPY_H #pragma once #include "ANSEngineCommon.h" #include #include #include #include #include #include #include #include #include #include #include #include #include namespace py = pybind11; namespace ANSCENTER { struct ModelWrapper { std::unordered_map> map; std::function modelFactory; }; class PythonRuntime { public: static bool IsInitialized(); static void Shutdown(); void AddToSysPath(const std::string& path); friend PythonRuntime& GetPythonRuntime(const std::wstring& pythonHome); private: explicit PythonRuntime(const std::wstring& home); void InitializeInterpreter(); void PatchPythonStreamsSafe(); static std::once_flag initFlag; static std::unique_ptr instance; static std::atomic _isInitialized; static std::atomic shuttingDown; static std::mutex _instanceMutex; std::wstring pythonHome; }; PythonRuntime& GetPythonRuntime(const std::wstring& pythonHome); class ANSENGINE_API ANSCUSTOMPY : public ANSODBase { public: ANSCUSTOMPY(); ~ANSCUSTOMPY(); bool Initialize(std::string licenseKey, ModelConfig modelConfig, const std::string& modelZipFilePath, const std::string& modelZipPassword, std::string& labelMap) override; bool LoadModel(const std::string& modelZipFilePath, const std::string& modelZipPassword) override; bool LoadModelFromFolder(std::string licenseKey, ModelConfig modelConfig, std::string modelName, std::string className, const std::string& modelFolder, std::string& labelMap) override; bool ConfigureParameters(Params& param) override; bool SetParameters(const Params& param) override; bool OptimizeModel(bool fp16, std::string& optimizedModelFolder); std::vector RunInference(const cv::Mat& input) override; std::vector RunInference(const cv::Mat& input, const std::string& camera_id) override; bool Destroy() override; static void SafeShutdownAll(bool waitForShutdown); private: bool _destroyed = false; bool _isInitialized = false; std::recursive_mutex _mutex; pybind11::object pyModelClass; std::shared_ptr _wrapper; int _instanceId; std::string _scriptPath; std::string _moduleName; bool InitializePythonModel(const std::string& moduleName, const std::string& fullPath); pybind11::object GetThreadLocalModel(); bool EnsureModelReady(const std::string& methodName); void ClearThreadState(); void RegisterSelf(); void UnregisterSelf(); std::string RunPythonInferenceFromMat(const cv::Mat& image); static std::mutex _globalInstancesMutex; static std::unordered_set _globalInstances; static std::atomic globalInstanceCounter; }; } // namespace ANSCENTER #endif // ANSCUSTOMPY_H //import numpy as np //import os //import json //from ultralytics import YOLO //from typing import List // //class ROIPoint : // def __init__(self, x, y) : // self.x = x // self.y = y // // def to_dict(self) : // return { "x": self.x, "y" : self.y } // // class ROIConfig : // def __init__(self, rectangle, polygon, line, min_items, max_items, name, roi_match) : // self.Rectangle = rectangle // self.Polygon = polygon // self.Line = line // self.MinItems = min_items // self.MaxItems = max_items // self.Name = name // self.ROI_Match = roi_match // // def to_dict(self) : // return { // "Rectangle": self.Rectangle, // "Polygon" : self.Polygon, // "Line" : self.Line, // "MinItems" : self.MinItems, // "MaxItems" : self.MaxItems, // "Name" : self.Name, // "ROI-Match" : self.ROI_Match //} // //class Parameter : // def __init__(self, name, datatype, no_of_decimals, max_val, min_val, start_val, list_items, default_val, value) : // self.Name = name // self.DataType = datatype // self.NoOfdecimals = no_of_decimals // self.MaxValue = max_val // self.MinValue = min_val // self.StartValue = start_val // self.ListItems = list_items // self.DefaultValue = default_val // self.Value = value // // def to_dict(self) : // return { // "Name": self.Name, // "DataType" : self.DataType, // "NoOfdecimals" : self.NoOfdecimals, // "MaxValue" : self.MaxValue, // "MinValue" : self.MinValue, // "StartValue" : self.StartValue, // "ListItems" : self.ListItems, // "DefaultValue" : self.DefaultValue, // "Value" : self.Value //} // //class ROIValue : // def __init__(self, roi_match, roi_points, option, name, original_image_size) : // self.ROI_Match = roi_match // self.ROIPoints = roi_points # list of ROIPoint // self.Option = option // self.Name = name // self.OriginalImageSize = original_image_size // // def to_dict(self) : // return { // "ROI-Match": self.ROI_Match, // "ROIPoints" : [p.to_dict() for p in self.ROIPoints] , // "Option" : self.Option, // "Name" : self.Name, // "OriginalImageSize" : self.OriginalImageSize //} // //class Params : // def __init__(self) : // self.ROI_Config : List[ROIConfig] = [] // self.ROI_Options : List[str] = [] // self.Parameters : List[Parameter] = [] // self.ROI_Values : List[ROIValue] = [] // // def to_dict(self) : // return { // "ROI_Config": [cfg.to_dict() for cfg in self.ROI_Config] , // "ROI_Options" : self.ROI_Options, // "Parameters" : [p.to_dict() for p in self.Parameters] , // "ROI_Values" : [v.to_dict() for v in self.ROI_Values] //} // //def to_json(self) : // return json.dumps(self.to_dict(), indent = 2) // // class ANSModel : // def __init__(self, model_name = "yolo11n.pt") : // current_dir = os.path.dirname(os.path.abspath(__file__)) // model_path = os.path.join(current_dir, model_name) // self.model = YOLO(model_path) // // # Define empty but structured parameter dictionary // self.params = { // "ROI_Config": [] , // "ROI_Options" : [] , // "Parameters" : [] , // "ROI_Values" : [] //} // //def run_inference(self, image_bytes: bytes, width : int, height : int, channels : int)->str: //np_arr = np.frombuffer(image_bytes, dtype = np.uint8) //image = np_arr.reshape((height, width, channels)) // //results = self.model(image) //output = { "results": [] } // //for result in results : //for box in result.boxes : // cls_id = int(box.cls[0]) // conf = float(box.conf[0]) // class_name = self.model.names[cls_id] // x1, y1, x2, y2 = box.xyxy[0].tolist() // x = int(x1) // y = int(y1) // w = int(x2 - x1) // h = int(y2 - y1) // // output["results"].append({ // "class_id": cls_id, // "track_id" : -1, // "class_name" : class_name, // "prob" : conf, // "x" : x, // "y" : y, // "width" : w, // "height" : h, // "mask" : "", // "extra_info" : "", // "camera_id" : "", // "polygon" : "", // "kps" : "" // }) // // return json.dumps(output) // // def model_optimize(self, fp16: bool) -> bool: //try : // print(f"[model_optimize] Called with fp16={fp16}") // # self.model.export(...) # real optimization here // return True // except Exception as e : //print(f"[model_optimize] Error: {e}") //return False // //def configureParameters(self)->str : //# --- Add ROI_Config entries --- // self.params["ROI_Config"].append({ // "Rectangle": True, // "Polygon" : True, // "Line" : False, // "MinItems" : 0, // "MaxItems" : 3, // "Name" : "Traffic Light", // "ROI-Match" : "All Corners" // }) // // self.params["ROI_Config"].append({ // "Rectangle": True, // "Polygon" : False, // "Line" : False, // "MinItems" : 1, // "MaxItems" : 1, // "Name" : "Car Zone", // "ROI-Match" : "All Corners" // }) // // self.params["ROI_Config"].append({ // "Rectangle": False, // "Polygon" : False, // "Line" : True, // "MinItems" : 1, // "MaxItems" : 2, // "Name" : "Cross Line", // "ROI-Match" : "All Corners" // }) // //# --- Add ROI_Options entries --- // self.params["ROI_Options"] += ["Inside ROI", "Inside ROI", "Both Directions"] // //# --- Add Parameters entries --- // self.params["Parameters"].append({ // "Name": "Para1", // "DataType" : "Boolean", // "NoOfdecimals" : 0, // "MaxValue" : 0, // "MinValue" : 0, // "StartValue" : "", // "ListItems" : [] , // "DefaultValue" : "", // "Value" : "true" // }) // // self.params["Parameters"].append({ // "Name": "Para2", // "DataType" : "Integer", // "NoOfdecimals" : 0, // "MaxValue" : 5, // "MinValue" : 1, // "StartValue" : "2", // "ListItems" : [] , // "DefaultValue" : "", // "Value" : "3" // }) // // self.params["Parameters"].append({ // "Name": "Para3", // "DataType" : "List-Single", // "NoOfdecimals" : 0, // "MaxValue" : 0, // "MinValue" : 0, // "StartValue" : "", // "ListItems" : ["A", "B", "C"] , // "DefaultValue" : "", // "Value" : "A" // }) // // self.params["Parameters"].append({ // "Name": "Para4", // "DataType" : "Range", // "NoOfdecimals" : 0, // "MaxValue" : 100, // "MinValue" : 50, // "StartValue" : ">,60", // "ListItems" : [">", "<"] , // "DefaultValue" : "", // "Value" : ">,52.000000" // }) // return json.dumps(self.params) // // def setParameters(self, json_str: str) -> bool: //try : // self.params = json.loads(json_str) // return True // except Exception as e : //print(f"[setParameters] Failed to parse parameters: {e}") //return False