123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502 |
- #ifndef OPENCV_GAPI_PIPELINE_MODELING_TOOL_PIPELINE_BUILDER_HPP
- #define OPENCV_GAPI_PIPELINE_MODELING_TOOL_PIPELINE_BUILDER_HPP
- #include <map>
- #include <opencv2/gapi/infer.hpp> // cv::gapi::GNetPackage
- #include <opencv2/gapi/streaming/cap.hpp> // cv::gapi::wip::IStreamSource
- #include <opencv2/gapi/infer/ie.hpp> // cv::gapi::ie::Params
- #include <opencv2/gapi/gcommon.hpp> // cv::gapi::GCompileArgs
- #include <opencv2/gapi/cpu/gcpukernel.hpp> // GAPI_OCV_KERNEL
- #include <opencv2/gapi/gkernel.hpp> // G_API_OP
- #include "pipeline.hpp"
- #include "utils.hpp"
- struct Edge {
- struct P {
- std::string name;
- size_t port;
- };
- P src;
- P dst;
- };
- struct CallNode {
- using F = std::function<void(const cv::GProtoArgs&, cv::GProtoArgs&)>;
- std::string name;
- F run;
- };
- struct DataNode {
- cv::optional<cv::GProtoArg> arg;
- };
- struct Node {
- using Ptr = std::shared_ptr<Node>;
- using WPtr = std::weak_ptr<Node>;
- using Kind = cv::util::variant<CallNode, DataNode>;
- std::vector<Node::WPtr> in_nodes;
- std::vector<Node::Ptr> out_nodes;
- Kind kind;
- };
- struct DummyCall {
- G_API_OP(GDummy,
- <cv::GMat(cv::GMat, double, OutputDescr)>,
- "custom.dummy") {
- static cv::GMatDesc outMeta(const cv::GMatDesc& /* in */,
- double /* time */,
- const OutputDescr& output) {
- if (output.dims.size() == 2) {
- return cv::GMatDesc(output.precision,
- 1,
- cv::Size(output.dims[0], output.dims[1]));
- }
- return cv::GMatDesc(output.precision, output.dims);
- }
- };
- struct DummyState {
- cv::Mat mat;
- };
- // NB: Generate random mat once and then
- // copy to dst buffer on every iteration.
- GAPI_OCV_KERNEL_ST(GCPUDummy, GDummy, DummyState) {
- static void setup(const cv::GMatDesc& /*in*/,
- double /*time*/,
- const OutputDescr& output,
- std::shared_ptr<DummyState>& state,
- const cv::GCompileArgs& /*args*/) {
- state.reset(new DummyState{});
- utils::createNDMat(state->mat, output.dims, output.precision);
- utils::generateRandom(state->mat);
- }
- static void run(const cv::Mat& /*in_mat*/,
- double time,
- const OutputDescr& /*output*/,
- cv::Mat& out_mat,
- DummyState& state) {
- using namespace std::chrono;
- double total = 0;
- auto start = high_resolution_clock::now();
- state.mat.copyTo(out_mat);
- while (total < time) {
- total = duration_cast<duration<double, std::milli>>(
- high_resolution_clock::now() - start).count();
- }
- }
- };
- void operator()(const cv::GProtoArgs& inputs, cv::GProtoArgs& outputs);
- size_t numInputs() const { return 1; }
- size_t numOutputs() const { return 1; }
- double time;
- OutputDescr output;
- };
- void DummyCall::operator()(const cv::GProtoArgs& inputs,
- cv::GProtoArgs& outputs) {
- GAPI_Assert(inputs.size() == 1u);
- GAPI_Assert(cv::util::holds_alternative<cv::GMat>(inputs[0]));
- GAPI_Assert(outputs.empty());
- auto in = cv::util::get<cv::GMat>(inputs[0]);
- outputs.emplace_back(GDummy::on(in, time, output));
- }
- struct InferCall {
- void operator()(const cv::GProtoArgs& inputs, cv::GProtoArgs& outputs);
- size_t numInputs() const { return input_layers.size(); }
- size_t numOutputs() const { return output_layers.size(); }
- std::string tag;
- std::vector<std::string> input_layers;
- std::vector<std::string> output_layers;
- };
- void InferCall::operator()(const cv::GProtoArgs& inputs,
- cv::GProtoArgs& outputs) {
- GAPI_Assert(inputs.size() == input_layers.size());
- GAPI_Assert(outputs.empty());
- cv::GInferInputs g_inputs;
- // TODO: Add an opportunity not specify input/output layers in case
- // there is only single layer.
- for (size_t i = 0; i < inputs.size(); ++i) {
- // TODO: Support GFrame as well.
- GAPI_Assert(cv::util::holds_alternative<cv::GMat>(inputs[i]));
- auto in = cv::util::get<cv::GMat>(inputs[i]);
- g_inputs[input_layers[i]] = in;
- }
- auto g_outputs = cv::gapi::infer<cv::gapi::Generic>(tag, g_inputs);
- for (size_t i = 0; i < output_layers.size(); ++i) {
- outputs.emplace_back(g_outputs.at(output_layers[i]));
- }
- }
- struct SourceCall {
- void operator()(const cv::GProtoArgs& inputs, cv::GProtoArgs& outputs);
- size_t numInputs() const { return 0; }
- size_t numOutputs() const { return 1; }
- };
- void SourceCall::operator()(const cv::GProtoArgs& inputs,
- cv::GProtoArgs& outputs) {
- GAPI_Assert(inputs.empty());
- GAPI_Assert(outputs.empty());
- // NB: Since NV12 isn't exposed source always produce GMat.
- outputs.emplace_back(cv::GMat());
- }
- struct LoadPath {
- std::string xml;
- std::string bin;
- };
- struct ImportPath {
- std::string blob;
- };
- using ModelPath = cv::util::variant<ImportPath, LoadPath>;
- struct InferParams {
- std::string name;
- ModelPath path;
- std::string device;
- std::vector<std::string> input_layers;
- std::vector<std::string> output_layers;
- std::map<std::string, std::string> config;
- };
- class PipelineBuilder {
- public:
- PipelineBuilder();
- void addDummy(const std::string& name,
- const double time,
- const OutputDescr& output);
- void addInfer(const std::string& name, const InferParams& params);
- void setSource(const std::string& name,
- double latency,
- const OutputDescr& output);
- void addEdge(const Edge& edge);
- void setMode(PLMode mode);
- void setDumpFilePath(const std::string& dump);
- void setQueueCapacity(const size_t qc);
- void setName(const std::string& name);
- Pipeline::Ptr build();
- private:
- template <typename CallT>
- void addCall(const std::string& name,
- CallT&& call);
- Pipeline::Ptr construct();
- template <typename K, typename V>
- using M = std::unordered_map<K, V>;
- struct State {
- struct NodeEdges {
- std::vector<Edge> input_edges;
- std::vector<Edge> output_edges;
- };
- M<std::string, Node::Ptr> calls_map;
- std::vector<Node::Ptr> all_calls;
- cv::gapi::GNetPackage networks;
- cv::gapi::GKernelPackage kernels;
- cv::GCompileArgs compile_args;
- cv::gapi::wip::IStreamSource::Ptr src;
- PLMode mode = PLMode::STREAMING;
- std::string name;
- };
- std::unique_ptr<State> m_state;
- };
- PipelineBuilder::PipelineBuilder() : m_state(new State{}) { };
- void PipelineBuilder::addDummy(const std::string& name,
- const double time,
- const OutputDescr& output) {
- m_state->kernels.include<DummyCall::GCPUDummy>();
- addCall(name, DummyCall{time, output});
- }
- template <typename CallT>
- void PipelineBuilder::addCall(const std::string& name,
- CallT&& call) {
- size_t num_inputs = call.numInputs();
- size_t num_outputs = call.numOutputs();
- Node::Ptr call_node(new Node{{},{},Node::Kind{CallNode{name, std::move(call)}}});
- // NB: Create placeholders for inputs.
- call_node->in_nodes.resize(num_inputs);
- // NB: Create outputs with empty data.
- for (size_t i = 0; i < num_outputs; ++i) {
- call_node->out_nodes.emplace_back(new Node{{call_node},
- {},
- Node::Kind{DataNode{}}});
- }
- auto it = m_state->calls_map.find(name);
- if (it != m_state->calls_map.end()) {
- throw std::logic_error("Node: " + name + " already exists!");
- }
- m_state->calls_map.emplace(name, call_node);
- m_state->all_calls.emplace_back(call_node);
- }
- void PipelineBuilder::addInfer(const std::string& name,
- const InferParams& params) {
- // NB: No default ctor for Params.
- std::unique_ptr<cv::gapi::ie::Params<cv::gapi::Generic>> pp;
- if (cv::util::holds_alternative<LoadPath>(params.path)) {
- auto load_path = cv::util::get<LoadPath>(params.path);
- pp.reset(new cv::gapi::ie::Params<cv::gapi::Generic>(name,
- load_path.xml,
- load_path.bin,
- params.device));
- } else {
- GAPI_Assert(cv::util::holds_alternative<ImportPath>(params.path));
- auto import_path = cv::util::get<ImportPath>(params.path);
- pp.reset(new cv::gapi::ie::Params<cv::gapi::Generic>(name,
- import_path.blob,
- params.device));
- }
- pp->pluginConfig(params.config);
- m_state->networks += cv::gapi::networks(*pp);
- addCall(name, InferCall{name, params.input_layers, params.output_layers});
- }
- void PipelineBuilder::addEdge(const Edge& edge) {
- const auto& src_it = m_state->calls_map.find(edge.src.name);
- if (src_it == m_state->calls_map.end()) {
- throw std::logic_error("Failed to find node: " + edge.src.name);
- }
- auto src_node = src_it->second;
- if (src_node->out_nodes.size() <= edge.src.port) {
- throw std::logic_error("Failed to access node: " + edge.src.name +
- " by out port: " + std::to_string(edge.src.port));
- }
- auto dst_it = m_state->calls_map.find(edge.dst.name);
- if (dst_it == m_state->calls_map.end()) {
- throw std::logic_error("Failed to find node: " + edge.dst.name);
- }
- auto dst_node = dst_it->second;
- if (dst_node->in_nodes.size() <= edge.dst.port) {
- throw std::logic_error("Failed to access node: " + edge.dst.name +
- " by in port: " + std::to_string(edge.dst.port));
- }
- auto out_data = src_node->out_nodes[edge.src.port];
- auto& in_data = dst_node->in_nodes[edge.dst.port];
- // NB: in_data != nullptr.
- if (!in_data.expired()) {
- throw std::logic_error("Node: " + edge.dst.name +
- " already connected by in port: " +
- std::to_string(edge.dst.port));
- }
- dst_node->in_nodes[edge.dst.port] = out_data;
- out_data->out_nodes.push_back(dst_node);
- }
- void PipelineBuilder::setSource(const std::string& name,
- double latency,
- const OutputDescr& output) {
- GAPI_Assert(!m_state->src);
- m_state->src = std::make_shared<DummySource>(latency, output);
- addCall(name, SourceCall{});
- }
- void PipelineBuilder::setMode(PLMode mode) {
- m_state->mode = mode;
- }
- void PipelineBuilder::setDumpFilePath(const std::string& dump) {
- m_state->compile_args.emplace_back(cv::graph_dump_path{dump});
- }
- void PipelineBuilder::setQueueCapacity(const size_t qc) {
- m_state->compile_args.emplace_back(cv::gapi::streaming::queue_capacity{qc});
- }
- void PipelineBuilder::setName(const std::string& name) {
- m_state->name = name;
- }
- static bool visit(Node::Ptr node,
- std::vector<Node::Ptr>& sorted,
- std::unordered_map<Node::Ptr, int>& visited) {
- if (!node) {
- throw std::logic_error("Found null node");
- }
- visited[node] = 1;
- for (auto in : node->in_nodes) {
- auto in_node = in.lock();
- if (visited[in_node] == 0) {
- if (visit(in_node, sorted, visited)) {
- return true;
- }
- } else if (visited[in_node] == 1) {
- return true;
- }
- }
- visited[node] = 2;
- sorted.push_back(node);
- return false;
- }
- static cv::optional<std::vector<Node::Ptr>>
- toposort(const std::vector<Node::Ptr> nodes) {
- std::vector<Node::Ptr> sorted;
- std::unordered_map<Node::Ptr, int> visited;
- for (auto n : nodes) {
- if (visit(n, sorted, visited)) {
- return cv::optional<std::vector<Node::Ptr>>{};
- }
- }
- return cv::util::make_optional(sorted);
- }
- Pipeline::Ptr PipelineBuilder::construct() {
- // NB: Unlike G-API, pipeline_builder_tool graph always starts with CALL node
- // (not data) that produce datas, so the call node which doesn't have
- // inputs is considered as "producer" node.
- //
- // Graph always starts with CALL node and ends with DATA node.
- // Graph example: [source] -> (source:0) -> [PP] -> (PP:0)
- //
- // The algorithm is quite simple:
- // 0. Verify that every call input node exists (connected).
- // 1. Sort all nodes by visiting only call nodes,
- // since there is no data nodes that's not connected with any call node,
- // it's guarantee that every node will be visited.
- // 2. Fillter call nodes.
- // 3. Go through every call node.
- // FIXME: Add toposort in case user passed nodes
- // in arbitrary order which is unlikely happened.
- // 4. Extract proto input from every input node
- // 5. Run call and get outputs
- // 6. If call node doesn't have inputs it means that it's "producer" node,
- // so collect all outputs to graph_inputs vector.
- // 7. Assign proto outputs to output data nodes,
- // so the next calls can use them as inputs.
- cv::GProtoArgs graph_inputs;
- cv::GProtoArgs graph_outputs;
- // 0. Verify that every call input node exists (connected).
- for (auto call_node : m_state->all_calls) {
- for (size_t i = 0; i < call_node->in_nodes.size(); ++i) {
- const auto& in_data_node = call_node->in_nodes[i];
- // NB: in_data_node == nullptr.
- if (in_data_node.expired()) {
- const auto& call = cv::util::get<CallNode>(call_node->kind);
- throw std::logic_error(
- "Node: " + call.name + " in Pipeline: " + m_state->name +
- " has dangling input by in port: " + std::to_string(i));
- }
- }
- }
- // (0) Sort all nodes;
- auto has_sorted = toposort(m_state->all_calls);
- if (!has_sorted) {
- throw std::logic_error(
- "Pipeline: " + m_state->name + " has cyclic dependencies") ;
- }
- auto& sorted = has_sorted.value();
- // (1). Fillter call nodes.
- std::vector<Node::Ptr> sorted_calls;
- for (auto n : sorted) {
- if (cv::util::holds_alternative<CallNode>(n->kind)) {
- sorted_calls.push_back(n);
- }
- }
- // (2). Go through every call node.
- for (auto call_node : sorted_calls) {
- cv::GProtoArgs outputs;
- cv::GProtoArgs inputs;
- for (size_t i = 0; i < call_node->in_nodes.size(); ++i) {
- auto in_node = call_node->in_nodes.at(i);
- auto in_data = cv::util::get<DataNode>(in_node.lock()->kind);
- if (!in_data.arg.has_value()) {
- throw std::logic_error("data hasn't been provided");
- }
- // (3). Extract proto input from every input node.
- inputs.push_back(in_data.arg.value());
- }
- // (4). Run call and get outputs.
- auto call = cv::util::get<CallNode>(call_node->kind);
- call.run(inputs, outputs);
- // (5) If call node doesn't have inputs
- // it means that it's input producer node (Source).
- if (call_node->in_nodes.empty()) {
- for (auto out : outputs) {
- graph_inputs.push_back(out);
- }
- }
- // (6). Assign proto outputs to output data nodes,
- // so the next calls can use them as inputs.
- GAPI_Assert(outputs.size() == call_node->out_nodes.size());
- for (size_t i = 0; i < outputs.size(); ++i) {
- auto out_node = call_node->out_nodes[i];
- auto& out_data = cv::util::get<DataNode>(out_node->kind);
- out_data.arg = cv::util::make_optional(outputs[i]);
- if (out_node->out_nodes.empty()) {
- graph_outputs.push_back(out_data.arg.value());
- }
- }
- }
- m_state->compile_args.emplace_back(m_state->networks);
- m_state->compile_args.emplace_back(m_state->kernels);
- if (m_state->mode == PLMode::STREAMING) {
- GAPI_Assert(graph_inputs.size() == 1);
- GAPI_Assert(cv::util::holds_alternative<cv::GMat>(graph_inputs[0]));
- // FIXME: Handle GFrame when NV12 comes.
- const auto& graph_input = cv::util::get<cv::GMat>(graph_inputs[0]);
- // NB: In case streaming mode need to expose timestamp in order to
- // calculate performance metrics.
- graph_outputs.emplace_back(
- cv::gapi::streaming::timestamp(graph_input).strip());
- return std::make_shared<StreamingPipeline>(std::move(m_state->name),
- cv::GComputation(
- cv::GProtoInputArgs{graph_inputs},
- cv::GProtoOutputArgs{graph_outputs}),
- std::move(m_state->src),
- std::move(m_state->compile_args),
- graph_outputs.size());
- }
- GAPI_Assert(m_state->mode == PLMode::REGULAR);
- return std::make_shared<RegularPipeline>(std::move(m_state->name),
- cv::GComputation(
- cv::GProtoInputArgs{graph_inputs},
- cv::GProtoOutputArgs{graph_outputs}),
- std::move(m_state->src),
- std::move(m_state->compile_args),
- graph_outputs.size());
- }
- Pipeline::Ptr PipelineBuilder::build() {
- auto pipeline = construct();
- m_state.reset(new State{});
- return pipeline;
- }
- #endif // OPENCV_GAPI_PIPELINE_MODELING_TOOL_PIPELINE_BUILDER_HPP
|