Files
ANSCORE/engines/OpenVINOEngine/include/utils/threads_common.hpp

161 lines
4.5 KiB
C++

// Copyright (C) 2018-2024 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#pragma once
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <utility>
#include <set>
#include <string>
#include <thread>
#include <vector>
#include <opencv2/core/core.hpp>
#include "utils/performance_metrics.hpp"
// VideoFrame can represent not a single image but the whole grid
class VideoFrame {
public:
typedef std::shared_ptr<VideoFrame> Ptr;
VideoFrame(unsigned sourceID, int64_t frameId, const cv::Mat& frame = cv::Mat()) :
sourceID{sourceID}, frameId{frameId}, frame{frame} {}
virtual ~VideoFrame() = default; // A user has to define how it is reconstructed
const unsigned sourceID;
const int64_t frameId;
cv::Mat frame;
PerformanceMetrics::TimePoint timestamp;
};
class Worker;
class Task {
public:
explicit Task(VideoFrame::Ptr sharedVideoFrame, float priority = 0):
sharedVideoFrame{sharedVideoFrame}, priority{priority} {}
virtual bool isReady() = 0;
virtual void process() = 0;
virtual ~Task() = default;
std::string name;
VideoFrame::Ptr sharedVideoFrame; // it is possible that two tasks try to draw on the same cvMat
const float priority;
};
struct HigherPriority {
bool operator()(const std::shared_ptr<Task>& lhs, const std::shared_ptr<Task>& rhs) const {
return lhs->priority > rhs->priority
|| (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId < rhs->sharedVideoFrame->frameId)
|| (lhs->priority == rhs->priority && lhs->sharedVideoFrame->frameId == rhs->sharedVideoFrame->frameId && lhs < rhs);
}
};
class Worker {
public:
explicit Worker(unsigned threadNum):
threadPool(threadNum), running{false} {}
~Worker() {
stop();
}
void runThreads() {
running = true;
for (std::thread& t : threadPool) {
t = std::thread(&Worker::threadFunc, this);
}
}
void push(std::shared_ptr<Task> task) {
tasksMutex.lock();
tasks.insert(task);
tasksMutex.unlock();
tasksCondVar.notify_one();
}
void threadFunc() {
while (running) {
std::unique_lock<std::mutex> lk(tasksMutex);
while (running && tasks.empty()) {
tasksCondVar.wait(lk);
}
try {
auto it = std::find_if(tasks.begin(), tasks.end(), [](const std::shared_ptr<Task>& task){return task->isReady();});
if (tasks.end() != it) {
const std::shared_ptr<Task> task = std::move(*it);
tasks.erase(it);
lk.unlock();
task->process();
}
} catch (...) {
std::lock_guard<std::mutex> lock{exceptionMutex};
if (nullptr == currentException) {
currentException = std::current_exception();
stop();
}
}
}
}
void stop() {
running = false;
tasksCondVar.notify_all();
}
void join() {
for (auto& t : threadPool) {
t.join();
}
if (nullptr != currentException) {
std::rethrow_exception(currentException);
}
}
private:
std::condition_variable tasksCondVar;
std::set<std::shared_ptr<Task>, HigherPriority> tasks;
std::mutex tasksMutex;
std::vector<std::thread> threadPool;
std::atomic<bool> running;
std::exception_ptr currentException;
std::mutex exceptionMutex;
};
template <class C> class ConcurrentContainer {
public:
C container;
mutable std::mutex mutex;
bool lockedEmpty() const noexcept {
std::lock_guard<std::mutex> lock{mutex};
return container.empty();
}
typename C::size_type lockedSize() const noexcept {
std::lock_guard<std::mutex> lock{mutex};
return container.size();
}
void lockedPushBack(const typename C::value_type& value) {
std::lock_guard<std::mutex> lock{mutex};
container.push_back(value);
}
bool lockedTryPop(typename C::value_type& value) {
mutex.lock();
if (!container.empty()) {
value = container.back();
container.pop_back();
mutex.unlock();
return true;
} else {
mutex.unlock();
return false;
}
}
operator C() const {
std::lock_guard<std::mutex> lock{mutex};
return container;
}
};