pipeline_builder.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. #ifndef OPENCV_GAPI_PIPELINE_MODELING_TOOL_PIPELINE_BUILDER_HPP
  2. #define OPENCV_GAPI_PIPELINE_MODELING_TOOL_PIPELINE_BUILDER_HPP
  3. #include <map>
  4. #include <opencv2/gapi/infer.hpp> // cv::gapi::GNetPackage
  5. #include <opencv2/gapi/streaming/cap.hpp> // cv::gapi::wip::IStreamSource
  6. #include <opencv2/gapi/infer/ie.hpp> // cv::gapi::ie::Params
  7. #include <opencv2/gapi/gcommon.hpp> // cv::gapi::GCompileArgs
  8. #include <opencv2/gapi/cpu/gcpukernel.hpp> // GAPI_OCV_KERNEL
  9. #include <opencv2/gapi/gkernel.hpp> // G_API_OP
  10. #include "pipeline.hpp"
  11. #include "utils.hpp"
  12. struct Edge {
  13. struct P {
  14. std::string name;
  15. size_t port;
  16. };
  17. P src;
  18. P dst;
  19. };
  20. struct CallNode {
  21. using F = std::function<void(const cv::GProtoArgs&, cv::GProtoArgs&)>;
  22. std::string name;
  23. F run;
  24. };
  25. struct DataNode {
  26. cv::optional<cv::GProtoArg> arg;
  27. };
  28. struct Node {
  29. using Ptr = std::shared_ptr<Node>;
  30. using WPtr = std::weak_ptr<Node>;
  31. using Kind = cv::util::variant<CallNode, DataNode>;
  32. std::vector<Node::WPtr> in_nodes;
  33. std::vector<Node::Ptr> out_nodes;
  34. Kind kind;
  35. };
  36. struct DummyCall {
  37. G_API_OP(GDummy,
  38. <cv::GMat(cv::GMat, double, OutputDescr)>,
  39. "custom.dummy") {
  40. static cv::GMatDesc outMeta(const cv::GMatDesc& /* in */,
  41. double /* time */,
  42. const OutputDescr& output) {
  43. if (output.dims.size() == 2) {
  44. return cv::GMatDesc(output.precision,
  45. 1,
  46. cv::Size(output.dims[0], output.dims[1]));
  47. }
  48. return cv::GMatDesc(output.precision, output.dims);
  49. }
  50. };
  51. struct DummyState {
  52. cv::Mat mat;
  53. };
  54. // NB: Generate random mat once and then
  55. // copy to dst buffer on every iteration.
  56. GAPI_OCV_KERNEL_ST(GCPUDummy, GDummy, DummyState) {
  57. static void setup(const cv::GMatDesc& /*in*/,
  58. double /*time*/,
  59. const OutputDescr& output,
  60. std::shared_ptr<DummyState>& state,
  61. const cv::GCompileArgs& /*args*/) {
  62. state.reset(new DummyState{});
  63. utils::createNDMat(state->mat, output.dims, output.precision);
  64. utils::generateRandom(state->mat);
  65. }
  66. static void run(const cv::Mat& /*in_mat*/,
  67. double time,
  68. const OutputDescr& /*output*/,
  69. cv::Mat& out_mat,
  70. DummyState& state) {
  71. using namespace std::chrono;
  72. double total = 0;
  73. auto start = high_resolution_clock::now();
  74. state.mat.copyTo(out_mat);
  75. while (total < time) {
  76. total = duration_cast<duration<double, std::milli>>(
  77. high_resolution_clock::now() - start).count();
  78. }
  79. }
  80. };
  81. void operator()(const cv::GProtoArgs& inputs, cv::GProtoArgs& outputs);
  82. size_t numInputs() const { return 1; }
  83. size_t numOutputs() const { return 1; }
  84. double time;
  85. OutputDescr output;
  86. };
  87. void DummyCall::operator()(const cv::GProtoArgs& inputs,
  88. cv::GProtoArgs& outputs) {
  89. GAPI_Assert(inputs.size() == 1u);
  90. GAPI_Assert(cv::util::holds_alternative<cv::GMat>(inputs[0]));
  91. GAPI_Assert(outputs.empty());
  92. auto in = cv::util::get<cv::GMat>(inputs[0]);
  93. outputs.emplace_back(GDummy::on(in, time, output));
  94. }
  95. struct InferCall {
  96. void operator()(const cv::GProtoArgs& inputs, cv::GProtoArgs& outputs);
  97. size_t numInputs() const { return input_layers.size(); }
  98. size_t numOutputs() const { return output_layers.size(); }
  99. std::string tag;
  100. std::vector<std::string> input_layers;
  101. std::vector<std::string> output_layers;
  102. };
  103. void InferCall::operator()(const cv::GProtoArgs& inputs,
  104. cv::GProtoArgs& outputs) {
  105. GAPI_Assert(inputs.size() == input_layers.size());
  106. GAPI_Assert(outputs.empty());
  107. cv::GInferInputs g_inputs;
  108. // TODO: Add an opportunity not specify input/output layers in case
  109. // there is only single layer.
  110. for (size_t i = 0; i < inputs.size(); ++i) {
  111. // TODO: Support GFrame as well.
  112. GAPI_Assert(cv::util::holds_alternative<cv::GMat>(inputs[i]));
  113. auto in = cv::util::get<cv::GMat>(inputs[i]);
  114. g_inputs[input_layers[i]] = in;
  115. }
  116. auto g_outputs = cv::gapi::infer<cv::gapi::Generic>(tag, g_inputs);
  117. for (size_t i = 0; i < output_layers.size(); ++i) {
  118. outputs.emplace_back(g_outputs.at(output_layers[i]));
  119. }
  120. }
  121. struct SourceCall {
  122. void operator()(const cv::GProtoArgs& inputs, cv::GProtoArgs& outputs);
  123. size_t numInputs() const { return 0; }
  124. size_t numOutputs() const { return 1; }
  125. };
  126. void SourceCall::operator()(const cv::GProtoArgs& inputs,
  127. cv::GProtoArgs& outputs) {
  128. GAPI_Assert(inputs.empty());
  129. GAPI_Assert(outputs.empty());
  130. // NB: Since NV12 isn't exposed source always produce GMat.
  131. outputs.emplace_back(cv::GMat());
  132. }
  133. struct LoadPath {
  134. std::string xml;
  135. std::string bin;
  136. };
  137. struct ImportPath {
  138. std::string blob;
  139. };
  140. using ModelPath = cv::util::variant<ImportPath, LoadPath>;
  141. struct InferParams {
  142. std::string name;
  143. ModelPath path;
  144. std::string device;
  145. std::vector<std::string> input_layers;
  146. std::vector<std::string> output_layers;
  147. std::map<std::string, std::string> config;
  148. };
  149. class PipelineBuilder {
  150. public:
  151. PipelineBuilder();
  152. void addDummy(const std::string& name,
  153. const double time,
  154. const OutputDescr& output);
  155. void addInfer(const std::string& name, const InferParams& params);
  156. void setSource(const std::string& name,
  157. double latency,
  158. const OutputDescr& output);
  159. void addEdge(const Edge& edge);
  160. void setMode(PLMode mode);
  161. void setDumpFilePath(const std::string& dump);
  162. void setQueueCapacity(const size_t qc);
  163. void setName(const std::string& name);
  164. Pipeline::Ptr build();
  165. private:
  166. template <typename CallT>
  167. void addCall(const std::string& name,
  168. CallT&& call);
  169. Pipeline::Ptr construct();
  170. template <typename K, typename V>
  171. using M = std::unordered_map<K, V>;
  172. struct State {
  173. struct NodeEdges {
  174. std::vector<Edge> input_edges;
  175. std::vector<Edge> output_edges;
  176. };
  177. M<std::string, Node::Ptr> calls_map;
  178. std::vector<Node::Ptr> all_calls;
  179. cv::gapi::GNetPackage networks;
  180. cv::gapi::GKernelPackage kernels;
  181. cv::GCompileArgs compile_args;
  182. cv::gapi::wip::IStreamSource::Ptr src;
  183. PLMode mode = PLMode::STREAMING;
  184. std::string name;
  185. };
  186. std::unique_ptr<State> m_state;
  187. };
  188. PipelineBuilder::PipelineBuilder() : m_state(new State{}) { };
  189. void PipelineBuilder::addDummy(const std::string& name,
  190. const double time,
  191. const OutputDescr& output) {
  192. m_state->kernels.include<DummyCall::GCPUDummy>();
  193. addCall(name, DummyCall{time, output});
  194. }
  195. template <typename CallT>
  196. void PipelineBuilder::addCall(const std::string& name,
  197. CallT&& call) {
  198. size_t num_inputs = call.numInputs();
  199. size_t num_outputs = call.numOutputs();
  200. Node::Ptr call_node(new Node{{},{},Node::Kind{CallNode{name, std::move(call)}}});
  201. // NB: Create placeholders for inputs.
  202. call_node->in_nodes.resize(num_inputs);
  203. // NB: Create outputs with empty data.
  204. for (size_t i = 0; i < num_outputs; ++i) {
  205. call_node->out_nodes.emplace_back(new Node{{call_node},
  206. {},
  207. Node::Kind{DataNode{}}});
  208. }
  209. auto it = m_state->calls_map.find(name);
  210. if (it != m_state->calls_map.end()) {
  211. throw std::logic_error("Node: " + name + " already exists!");
  212. }
  213. m_state->calls_map.emplace(name, call_node);
  214. m_state->all_calls.emplace_back(call_node);
  215. }
  216. void PipelineBuilder::addInfer(const std::string& name,
  217. const InferParams& params) {
  218. // NB: No default ctor for Params.
  219. std::unique_ptr<cv::gapi::ie::Params<cv::gapi::Generic>> pp;
  220. if (cv::util::holds_alternative<LoadPath>(params.path)) {
  221. auto load_path = cv::util::get<LoadPath>(params.path);
  222. pp.reset(new cv::gapi::ie::Params<cv::gapi::Generic>(name,
  223. load_path.xml,
  224. load_path.bin,
  225. params.device));
  226. } else {
  227. GAPI_Assert(cv::util::holds_alternative<ImportPath>(params.path));
  228. auto import_path = cv::util::get<ImportPath>(params.path);
  229. pp.reset(new cv::gapi::ie::Params<cv::gapi::Generic>(name,
  230. import_path.blob,
  231. params.device));
  232. }
  233. pp->pluginConfig(params.config);
  234. m_state->networks += cv::gapi::networks(*pp);
  235. addCall(name, InferCall{name, params.input_layers, params.output_layers});
  236. }
  237. void PipelineBuilder::addEdge(const Edge& edge) {
  238. const auto& src_it = m_state->calls_map.find(edge.src.name);
  239. if (src_it == m_state->calls_map.end()) {
  240. throw std::logic_error("Failed to find node: " + edge.src.name);
  241. }
  242. auto src_node = src_it->second;
  243. if (src_node->out_nodes.size() <= edge.src.port) {
  244. throw std::logic_error("Failed to access node: " + edge.src.name +
  245. " by out port: " + std::to_string(edge.src.port));
  246. }
  247. auto dst_it = m_state->calls_map.find(edge.dst.name);
  248. if (dst_it == m_state->calls_map.end()) {
  249. throw std::logic_error("Failed to find node: " + edge.dst.name);
  250. }
  251. auto dst_node = dst_it->second;
  252. if (dst_node->in_nodes.size() <= edge.dst.port) {
  253. throw std::logic_error("Failed to access node: " + edge.dst.name +
  254. " by in port: " + std::to_string(edge.dst.port));
  255. }
  256. auto out_data = src_node->out_nodes[edge.src.port];
  257. auto& in_data = dst_node->in_nodes[edge.dst.port];
  258. // NB: in_data != nullptr.
  259. if (!in_data.expired()) {
  260. throw std::logic_error("Node: " + edge.dst.name +
  261. " already connected by in port: " +
  262. std::to_string(edge.dst.port));
  263. }
  264. dst_node->in_nodes[edge.dst.port] = out_data;
  265. out_data->out_nodes.push_back(dst_node);
  266. }
  267. void PipelineBuilder::setSource(const std::string& name,
  268. double latency,
  269. const OutputDescr& output) {
  270. GAPI_Assert(!m_state->src);
  271. m_state->src = std::make_shared<DummySource>(latency, output);
  272. addCall(name, SourceCall{});
  273. }
  274. void PipelineBuilder::setMode(PLMode mode) {
  275. m_state->mode = mode;
  276. }
  277. void PipelineBuilder::setDumpFilePath(const std::string& dump) {
  278. m_state->compile_args.emplace_back(cv::graph_dump_path{dump});
  279. }
  280. void PipelineBuilder::setQueueCapacity(const size_t qc) {
  281. m_state->compile_args.emplace_back(cv::gapi::streaming::queue_capacity{qc});
  282. }
  283. void PipelineBuilder::setName(const std::string& name) {
  284. m_state->name = name;
  285. }
  286. static bool visit(Node::Ptr node,
  287. std::vector<Node::Ptr>& sorted,
  288. std::unordered_map<Node::Ptr, int>& visited) {
  289. if (!node) {
  290. throw std::logic_error("Found null node");
  291. }
  292. visited[node] = 1;
  293. for (auto in : node->in_nodes) {
  294. auto in_node = in.lock();
  295. if (visited[in_node] == 0) {
  296. if (visit(in_node, sorted, visited)) {
  297. return true;
  298. }
  299. } else if (visited[in_node] == 1) {
  300. return true;
  301. }
  302. }
  303. visited[node] = 2;
  304. sorted.push_back(node);
  305. return false;
  306. }
  307. static cv::optional<std::vector<Node::Ptr>>
  308. toposort(const std::vector<Node::Ptr> nodes) {
  309. std::vector<Node::Ptr> sorted;
  310. std::unordered_map<Node::Ptr, int> visited;
  311. for (auto n : nodes) {
  312. if (visit(n, sorted, visited)) {
  313. return cv::optional<std::vector<Node::Ptr>>{};
  314. }
  315. }
  316. return cv::util::make_optional(sorted);
  317. }
  318. Pipeline::Ptr PipelineBuilder::construct() {
  319. // NB: Unlike G-API, pipeline_builder_tool graph always starts with CALL node
  320. // (not data) that produce datas, so the call node which doesn't have
  321. // inputs is considered as "producer" node.
  322. //
  323. // Graph always starts with CALL node and ends with DATA node.
  324. // Graph example: [source] -> (source:0) -> [PP] -> (PP:0)
  325. //
  326. // The algorithm is quite simple:
  327. // 0. Verify that every call input node exists (connected).
  328. // 1. Sort all nodes by visiting only call nodes,
  329. // since there is no data nodes that's not connected with any call node,
  330. // it's guarantee that every node will be visited.
  331. // 2. Fillter call nodes.
  332. // 3. Go through every call node.
  333. // FIXME: Add toposort in case user passed nodes
  334. // in arbitrary order which is unlikely happened.
  335. // 4. Extract proto input from every input node
  336. // 5. Run call and get outputs
  337. // 6. If call node doesn't have inputs it means that it's "producer" node,
  338. // so collect all outputs to graph_inputs vector.
  339. // 7. Assign proto outputs to output data nodes,
  340. // so the next calls can use them as inputs.
  341. cv::GProtoArgs graph_inputs;
  342. cv::GProtoArgs graph_outputs;
  343. // 0. Verify that every call input node exists (connected).
  344. for (auto call_node : m_state->all_calls) {
  345. for (size_t i = 0; i < call_node->in_nodes.size(); ++i) {
  346. const auto& in_data_node = call_node->in_nodes[i];
  347. // NB: in_data_node == nullptr.
  348. if (in_data_node.expired()) {
  349. const auto& call = cv::util::get<CallNode>(call_node->kind);
  350. throw std::logic_error(
  351. "Node: " + call.name + " in Pipeline: " + m_state->name +
  352. " has dangling input by in port: " + std::to_string(i));
  353. }
  354. }
  355. }
  356. // (0) Sort all nodes;
  357. auto has_sorted = toposort(m_state->all_calls);
  358. if (!has_sorted) {
  359. throw std::logic_error(
  360. "Pipeline: " + m_state->name + " has cyclic dependencies") ;
  361. }
  362. auto& sorted = has_sorted.value();
  363. // (1). Fillter call nodes.
  364. std::vector<Node::Ptr> sorted_calls;
  365. for (auto n : sorted) {
  366. if (cv::util::holds_alternative<CallNode>(n->kind)) {
  367. sorted_calls.push_back(n);
  368. }
  369. }
  370. // (2). Go through every call node.
  371. for (auto call_node : sorted_calls) {
  372. cv::GProtoArgs outputs;
  373. cv::GProtoArgs inputs;
  374. for (size_t i = 0; i < call_node->in_nodes.size(); ++i) {
  375. auto in_node = call_node->in_nodes.at(i);
  376. auto in_data = cv::util::get<DataNode>(in_node.lock()->kind);
  377. if (!in_data.arg.has_value()) {
  378. throw std::logic_error("data hasn't been provided");
  379. }
  380. // (3). Extract proto input from every input node.
  381. inputs.push_back(in_data.arg.value());
  382. }
  383. // (4). Run call and get outputs.
  384. auto call = cv::util::get<CallNode>(call_node->kind);
  385. call.run(inputs, outputs);
  386. // (5) If call node doesn't have inputs
  387. // it means that it's input producer node (Source).
  388. if (call_node->in_nodes.empty()) {
  389. for (auto out : outputs) {
  390. graph_inputs.push_back(out);
  391. }
  392. }
  393. // (6). Assign proto outputs to output data nodes,
  394. // so the next calls can use them as inputs.
  395. GAPI_Assert(outputs.size() == call_node->out_nodes.size());
  396. for (size_t i = 0; i < outputs.size(); ++i) {
  397. auto out_node = call_node->out_nodes[i];
  398. auto& out_data = cv::util::get<DataNode>(out_node->kind);
  399. out_data.arg = cv::util::make_optional(outputs[i]);
  400. if (out_node->out_nodes.empty()) {
  401. graph_outputs.push_back(out_data.arg.value());
  402. }
  403. }
  404. }
  405. m_state->compile_args.emplace_back(m_state->networks);
  406. m_state->compile_args.emplace_back(m_state->kernels);
  407. if (m_state->mode == PLMode::STREAMING) {
  408. GAPI_Assert(graph_inputs.size() == 1);
  409. GAPI_Assert(cv::util::holds_alternative<cv::GMat>(graph_inputs[0]));
  410. // FIXME: Handle GFrame when NV12 comes.
  411. const auto& graph_input = cv::util::get<cv::GMat>(graph_inputs[0]);
  412. // NB: In case streaming mode need to expose timestamp in order to
  413. // calculate performance metrics.
  414. graph_outputs.emplace_back(
  415. cv::gapi::streaming::timestamp(graph_input).strip());
  416. return std::make_shared<StreamingPipeline>(std::move(m_state->name),
  417. cv::GComputation(
  418. cv::GProtoInputArgs{graph_inputs},
  419. cv::GProtoOutputArgs{graph_outputs}),
  420. std::move(m_state->src),
  421. std::move(m_state->compile_args),
  422. graph_outputs.size());
  423. }
  424. GAPI_Assert(m_state->mode == PLMode::REGULAR);
  425. return std::make_shared<RegularPipeline>(std::move(m_state->name),
  426. cv::GComputation(
  427. cv::GProtoInputArgs{graph_inputs},
  428. cv::GProtoOutputArgs{graph_outputs}),
  429. std::move(m_state->src),
  430. std::move(m_state->compile_args),
  431. graph_outputs.size());
  432. }
  433. Pipeline::Ptr PipelineBuilder::build() {
  434. auto pipeline = construct();
  435. m_state.reset(new State{});
  436. return pipeline;
  437. }
  438. #endif // OPENCV_GAPI_PIPELINE_MODELING_TOOL_PIPELINE_BUILDER_HPP