pipeline_modeling_tool.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. #include <iostream>
  2. #include <fstream>
  3. #include <thread>
  4. #include <exception>
  5. #include <unordered_map>
  6. #include <vector>
  7. #include <opencv2/gapi.hpp>
  8. #include <opencv2/highgui.hpp> // cv::CommandLineParser
  9. #include <opencv2/core/utils/filesystem.hpp>
  10. #if defined(_WIN32)
  11. #include <windows.h>
  12. #endif
  13. #include "pipeline_modeling_tool/dummy_source.hpp"
  14. #include "pipeline_modeling_tool/utils.hpp"
  15. #include "pipeline_modeling_tool/pipeline_builder.hpp"
  16. enum class AppMode {
  17. REALTIME,
  18. BENCHMARK
  19. };
  20. static AppMode strToAppMode(const std::string& mode_str) {
  21. if (mode_str == "realtime") {
  22. return AppMode::REALTIME;
  23. } else if (mode_str == "benchmark") {
  24. return AppMode::BENCHMARK;
  25. } else {
  26. throw std::logic_error("Unsupported AppMode: " + mode_str +
  27. "\nPlease chose between: realtime and benchmark");
  28. }
  29. }
  30. template <typename T>
  31. T read(const cv::FileNode& node) {
  32. return static_cast<T>(node);
  33. }
  34. static cv::FileNode check_and_get_fn(const cv::FileNode& fn,
  35. const std::string& field,
  36. const std::string& uplvl) {
  37. const bool is_map = fn.isMap();
  38. if (!is_map || fn[field].empty()) {
  39. throw std::logic_error(uplvl + " must contain field: " + field);
  40. }
  41. return fn[field];
  42. }
  43. static cv::FileNode check_and_get_fn(const cv::FileStorage& fs,
  44. const std::string& field,
  45. const std::string& uplvl) {
  46. auto fn = fs[field];
  47. if (fn.empty()) {
  48. throw std::logic_error(uplvl + " must contain field: " + field);
  49. }
  50. return fn;
  51. }
  52. template <typename T, typename FileT>
  53. T check_and_read(const FileT& f,
  54. const std::string& field,
  55. const std::string& uplvl) {
  56. auto fn = check_and_get_fn(f, field, uplvl);
  57. return read<T>(fn);
  58. }
  59. template <typename T>
  60. cv::optional<T> readOpt(const cv::FileNode& fn) {
  61. return fn.empty() ? cv::optional<T>() : cv::optional<T>(read<T>(fn));
  62. }
  63. template <typename T>
  64. std::vector<T> readList(const cv::FileNode& fn,
  65. const std::string& field,
  66. const std::string& uplvl) {
  67. auto fn_field = check_and_get_fn(fn, field, uplvl);
  68. if (!fn_field.isSeq()) {
  69. throw std::logic_error(field + " in " + uplvl + " must be a sequence");
  70. }
  71. std::vector<T> vec;
  72. for (auto iter : fn_field) {
  73. vec.push_back(read<T>(iter));
  74. }
  75. return vec;
  76. }
  77. template <typename T>
  78. std::vector<T> readVec(const cv::FileNode& fn,
  79. const std::string& field,
  80. const std::string& uplvl) {
  81. auto fn_field = check_and_get_fn(fn, field, uplvl);
  82. std::vector<T> vec;
  83. fn_field >> vec;
  84. return vec;
  85. }
  86. static int strToPrecision(const std::string& precision) {
  87. static std::unordered_map<std::string, int> str_to_precision = {
  88. {"U8", CV_8U}, {"FP32", CV_32F}, {"FP16", CV_16F}
  89. };
  90. auto it = str_to_precision.find(precision);
  91. if (it == str_to_precision.end()) {
  92. throw std::logic_error("Unsupported precision: " + precision);
  93. }
  94. return it->second;
  95. }
  96. template <>
  97. OutputDescr read<OutputDescr>(const cv::FileNode& fn) {
  98. auto dims = readVec<int>(fn, "dims", "output");
  99. auto str_prec = check_and_read<std::string>(fn, "precision", "output");
  100. return OutputDescr{dims, strToPrecision(str_prec)};
  101. }
  102. template <>
  103. Edge read<Edge>(const cv::FileNode& fn) {
  104. auto from = check_and_read<std::string>(fn, "from", "edge");
  105. auto to = check_and_read<std::string>(fn, "to", "edge");
  106. auto splitNameAndPort = [](const std::string& str) {
  107. auto pos = str.find(':');
  108. auto name =
  109. pos == std::string::npos ? str : std::string(str.c_str(), pos);
  110. size_t port =
  111. pos == std::string::npos ? 0 : std::atoi(str.c_str() + pos + 1);
  112. return std::make_pair(name, port);
  113. };
  114. auto p1 = splitNameAndPort(from);
  115. auto p2 = splitNameAndPort(to);
  116. return Edge{Edge::P{p1.first, p1.second}, Edge::P{p2.first, p2.second}};
  117. }
  118. static std::string getModelsPath() {
  119. static char* models_path_c = std::getenv("PIPELINE_MODELS_PATH");
  120. static std::string models_path = models_path_c ? models_path_c : ".";
  121. return models_path;
  122. }
  123. template <>
  124. ModelPath read<ModelPath>(const cv::FileNode& fn) {
  125. using cv::utils::fs::join;
  126. if (!fn["xml"].empty() && !fn["bin"].empty()) {
  127. return ModelPath{LoadPath{join(getModelsPath(), fn["xml"].string()),
  128. join(getModelsPath(), fn["bin"].string())}};
  129. } else if (!fn["blob"].empty()){
  130. return ModelPath{ImportPath{join(getModelsPath(), fn["blob"].string())}};
  131. } else {
  132. const std::string emsg = R""""(
  133. Path to OpenVINO model must be specified in either of two formats:
  134. 1.
  135. xml: path to *.xml
  136. bin: path to *.bin
  137. 2.
  138. blob: path to *.blob
  139. )"""";
  140. throw std::logic_error(emsg);
  141. }
  142. }
  143. static PLMode strToPLMode(const std::string& mode_str) {
  144. if (mode_str == "streaming") {
  145. return PLMode::STREAMING;
  146. } else if (mode_str == "regular") {
  147. return PLMode::REGULAR;
  148. } else {
  149. throw std::logic_error("Unsupported PLMode: " + mode_str +
  150. "\nPlease chose between: streaming and regular");
  151. }
  152. }
  153. static std::vector<std::string> parseExecList(const std::string& exec_list) {
  154. std::vector<std::string> pl_types;
  155. std::stringstream ss(exec_list);
  156. std::string pl_type;
  157. while (getline(ss, pl_type, ',')) {
  158. pl_types.push_back(pl_type);
  159. }
  160. return pl_types;
  161. }
  162. static void loadConfig(const std::string& filename,
  163. std::map<std::string, std::string>& config) {
  164. cv::FileStorage fs(filename, cv::FileStorage::READ);
  165. if (!fs.isOpened()) {
  166. throw std::runtime_error("Failed to load config: " + filename);
  167. }
  168. cv::FileNode root = fs.root();
  169. for (auto it = root.begin(); it != root.end(); ++it) {
  170. auto device = *it;
  171. if (!device.isMap()) {
  172. throw std::runtime_error("Failed to parse config: " + filename);
  173. }
  174. for (auto item : device) {
  175. config.emplace(item.name(), item.string());
  176. }
  177. }
  178. }
  179. int main(int argc, char* argv[]) {
  180. #if defined(_WIN32)
  181. timeBeginPeriod(1);
  182. #endif
  183. try {
  184. const std::string keys =
  185. "{ h help | | Print this help message. }"
  186. "{ cfg | | Path to the config which is either"
  187. " YAML file or string. }"
  188. "{ load_config | | Optional. Path to XML/YAML/JSON file"
  189. " to load custom IE parameters. }"
  190. "{ cache_dir | | Optional. Enables caching of loaded models"
  191. " to specified directory. }"
  192. "{ log_file | | Optional. If file is specified, app will"
  193. " dump expanded execution information. }"
  194. "{ pl_mode | streaming | Optional. Pipeline mode: streaming/regular"
  195. " if it's specified will be applied for"
  196. " every pipeline. }"
  197. "{ qc | 1 | Optional. Calculated automatically by G-API"
  198. " if set to 0. If it's specified will be"
  199. " applied for every pipeline. }"
  200. "{ app_mode | realtime | Application mode (realtime/benchmark). }"
  201. "{ exec_list | | A comma-separated list of pipelines that"
  202. " will be executed. Spaces around commas"
  203. " are prohibited. }";
  204. cv::CommandLineParser cmd(argc, argv, keys);
  205. if (cmd.has("help")) {
  206. cmd.printMessage();
  207. return 0;
  208. }
  209. const auto cfg = cmd.get<std::string>("cfg");
  210. const auto load_config = cmd.get<std::string>("load_config");
  211. const auto cached_dir = cmd.get<std::string>("cache_dir");
  212. const auto log_file = cmd.get<std::string>("log_file");
  213. const auto pl_mode = strToPLMode(cmd.get<std::string>("pl_mode"));
  214. const auto qc = cmd.get<int>("qc");
  215. const auto app_mode = strToAppMode(cmd.get<std::string>("app_mode"));
  216. const auto exec_str = cmd.get<std::string>("exec_list");
  217. cv::FileStorage fs;
  218. if (cfg.empty()) {
  219. throw std::logic_error("Config must be specified via --cfg option");
  220. }
  221. // NB: *.yml
  222. if (cfg.size() < 5) {
  223. throw std::logic_error("--cfg string must contain at least 5 symbols"
  224. " to determine if it's a file (*.yml) a or string");
  225. }
  226. if (cfg.substr(cfg.size() - 4, cfg.size()) == ".yml") {
  227. if (!fs.open(cfg, cv::FileStorage::READ)) {
  228. throw std::logic_error("Failed to open config file: " + cfg);
  229. }
  230. } else {
  231. fs = cv::FileStorage(cfg, cv::FileStorage::FORMAT_YAML |
  232. cv::FileStorage::MEMORY);
  233. }
  234. std::map<std::string, std::string> config;
  235. if (!load_config.empty()) {
  236. loadConfig(load_config, config);
  237. }
  238. // NB: Takes priority over config from file
  239. if (!cached_dir.empty()) {
  240. config =
  241. std::map<std::string, std::string>{{"CACHE_DIR", cached_dir}};
  242. }
  243. const double work_time_ms =
  244. check_and_read<double>(fs, "work_time", "Config");
  245. if (work_time_ms < 0) {
  246. throw std::logic_error("work_time must be positive");
  247. }
  248. auto pipelines_fn = check_and_get_fn(fs, "Pipelines", "Config");
  249. if (!pipelines_fn.isMap()) {
  250. throw std::logic_error("Pipelines field must be a map");
  251. }
  252. auto exec_list = !exec_str.empty() ? parseExecList(exec_str)
  253. : pipelines_fn.keys();
  254. std::vector<Pipeline::Ptr> pipelines;
  255. pipelines.reserve(exec_list.size());
  256. // NB: Build pipelines based on config information
  257. PipelineBuilder builder;
  258. for (const auto& name : exec_list) {
  259. const auto& pl_fn = check_and_get_fn(pipelines_fn, name, "Pipelines");
  260. builder.setName(name);
  261. // NB: Set source
  262. {
  263. const auto& src_fn = check_and_get_fn(pl_fn, "source", name);
  264. auto src_name =
  265. check_and_read<std::string>(src_fn, "name", "source");
  266. auto latency =
  267. check_and_read<double>(src_fn, "latency", "source");
  268. auto output =
  269. check_and_read<OutputDescr>(src_fn, "output", "source");
  270. // NB: In case BENCHMARK mode sources work with zero latency.
  271. if (app_mode == AppMode::BENCHMARK) {
  272. latency = 0.0;
  273. }
  274. builder.setSource(src_name, latency, output);
  275. }
  276. const auto& nodes_fn = check_and_get_fn(pl_fn, "nodes", name);
  277. if (!nodes_fn.isSeq()) {
  278. throw std::logic_error("nodes in " + name + " must be a sequence");
  279. }
  280. for (auto node_fn : nodes_fn) {
  281. auto node_name =
  282. check_and_read<std::string>(node_fn, "name", "node");
  283. auto node_type =
  284. check_and_read<std::string>(node_fn, "type", "node");
  285. if (node_type == "Dummy") {
  286. auto time =
  287. check_and_read<double>(node_fn, "time", node_name);
  288. if (time < 0) {
  289. throw std::logic_error(node_name + " time must be positive");
  290. }
  291. auto output =
  292. check_and_read<OutputDescr>(node_fn, "output", node_name);
  293. builder.addDummy(node_name, time, output);
  294. } else if (node_type == "Infer") {
  295. InferParams params;
  296. params.path = read<ModelPath>(node_fn);
  297. params.device =
  298. check_and_read<std::string>(node_fn, "device", node_name);
  299. params.input_layers =
  300. readList<std::string>(node_fn, "input_layers", node_name);
  301. params.output_layers =
  302. readList<std::string>(node_fn, "output_layers", node_name);
  303. params.config = config;
  304. builder.addInfer(node_name, params);
  305. } else {
  306. throw std::logic_error("Unsupported node type: " + node_type);
  307. }
  308. }
  309. const auto edges_fn = check_and_get_fn(pl_fn, "edges", name);
  310. if (!edges_fn.isSeq()) {
  311. throw std::logic_error("edges in " + name + " must be a sequence");
  312. }
  313. for (auto edge_fn : edges_fn) {
  314. auto edge = read<Edge>(edge_fn);
  315. builder.addEdge(edge);
  316. }
  317. // NB: Pipeline mode from config takes priority over cmd.
  318. auto mode = readOpt<std::string>(pl_fn["mode"]);
  319. builder.setMode(mode.has_value() ? strToPLMode(mode.value()) : pl_mode);
  320. // NB: Queue capacity from config takes priority over cmd.
  321. auto config_qc = readOpt<int>(pl_fn["queue_capacity"]);
  322. auto queue_capacity = config_qc.has_value() ? config_qc.value() : qc;
  323. // NB: 0 is special constant that means
  324. // queue capacity should be calculated automatically.
  325. if (queue_capacity != 0) {
  326. builder.setQueueCapacity(queue_capacity);
  327. }
  328. auto dump = readOpt<std::string>(pl_fn["dump"]);
  329. if (dump) {
  330. builder.setDumpFilePath(dump.value());
  331. }
  332. pipelines.emplace_back(builder.build());
  333. }
  334. // NB: Compille pipelines
  335. for (size_t i = 0; i < pipelines.size(); ++i) {
  336. pipelines[i]->compile();
  337. }
  338. // NB: Execute pipelines
  339. std::vector<std::exception_ptr> eptrs(pipelines.size(), nullptr);
  340. std::vector<std::thread> threads(pipelines.size());
  341. for (size_t i = 0; i < pipelines.size(); ++i) {
  342. threads[i] = std::thread([&, i]() {
  343. try {
  344. pipelines[i]->run(work_time_ms);
  345. } catch (...) {
  346. eptrs[i] = std::current_exception();
  347. }
  348. });
  349. }
  350. std::ofstream file;
  351. if (!log_file.empty()) {
  352. file.open(log_file);
  353. }
  354. for (size_t i = 0; i < threads.size(); ++i) {
  355. threads[i].join();
  356. }
  357. for (size_t i = 0; i < threads.size(); ++i) {
  358. if (eptrs[i] != nullptr) {
  359. try {
  360. std::rethrow_exception(eptrs[i]);
  361. } catch (std::exception& e) {
  362. throw std::logic_error(pipelines[i]->name() + " failed: " + e.what());
  363. }
  364. }
  365. if (file.is_open()) {
  366. file << pipelines[i]->report().toStr(true) << std::endl;
  367. }
  368. std::cout << pipelines[i]->report().toStr() << std::endl;
  369. }
  370. } catch (const std::exception& e) {
  371. std::cout << e.what() << std::endl;
  372. throw;
  373. }
  374. return 0;
  375. }