conc_queue_tests.cpp 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // This file is part of OpenCV project.
  2. // It is subject to the license terms in the LICENSE file found in the top-level directory
  3. // of this distribution and at http://opencv.org/license.html.
  4. //
  5. // Copyright (C) 2019 Intel Corporation
  6. #include "../test_precomp.hpp"
  7. #include <unordered_set>
  8. #include <thread>
  9. #include "executor/conc_queue.hpp"
  10. namespace opencv_test
  11. {
  12. using namespace cv::gapi;
  13. TEST(ConcQueue, PushPop)
  14. {
  15. own::concurrent_bounded_queue<int> q;
  16. for (int i = 0; i < 100; i++)
  17. {
  18. q.push(i);
  19. }
  20. for (int i = 0; i < 100; i++)
  21. {
  22. int x;
  23. q.pop(x);
  24. EXPECT_EQ(i, x);
  25. }
  26. }
  27. TEST(ConcQueue, TryPop)
  28. {
  29. own::concurrent_bounded_queue<int> q;
  30. int x = 0;
  31. EXPECT_FALSE(q.try_pop(x));
  32. q.push(1);
  33. EXPECT_TRUE(q.try_pop(x));
  34. EXPECT_EQ(1, x);
  35. }
  36. TEST(ConcQueue, Clear)
  37. {
  38. own::concurrent_bounded_queue<int> q;
  39. for (int i = 0; i < 10; i++)
  40. {
  41. q.push(i);
  42. }
  43. q.clear();
  44. int x = 0;
  45. EXPECT_FALSE(q.try_pop(x));
  46. }
  47. // In this test, every writer thread produces its own range of integer
  48. // numbers, writing those to a shared queue.
  49. //
  50. // Every reader thread pops elements from the queue (until -1 is
  51. // reached) and stores those in its own associated set.
  52. //
  53. // Finally, the master thread waits for completion of all other
  54. // threads and verifies that all the necessary data is
  55. // produced/obtained.
  56. namespace
  57. {
  58. using StressParam = std::tuple<int // Num writer threads
  59. ,int // Num elements per writer
  60. ,int // Num reader threads
  61. ,std::size_t>; // Queue capacity
  62. constexpr int STOP_SIGN = -1;
  63. constexpr int BASE = 1000;
  64. }
  65. struct ConcQueue_: public ::testing::TestWithParam<StressParam>
  66. {
  67. using Q = own::concurrent_bounded_queue<int>;
  68. using S = std::unordered_set<int>;
  69. static void writer(int base, int writes, Q& q)
  70. {
  71. for (int i = 0; i < writes; i++)
  72. {
  73. q.push(base + i);
  74. }
  75. q.push(STOP_SIGN);
  76. }
  77. static void reader(Q& q, S& s)
  78. {
  79. int x = 0;
  80. while (true)
  81. {
  82. q.pop(x);
  83. if (x == STOP_SIGN) return;
  84. s.insert(x);
  85. }
  86. }
  87. };
  88. TEST_P(ConcQueue_, Test)
  89. {
  90. int num_writers = 0;
  91. int num_writes = 0;
  92. int num_readers = 0;
  93. std::size_t capacity = 0u;
  94. std::tie(num_writers, num_writes, num_readers, capacity) = GetParam();
  95. CV_Assert(num_writers < 20);
  96. CV_Assert(num_writes < BASE);
  97. Q q;
  98. if (capacity)
  99. {
  100. // see below (2)
  101. CV_Assert(static_cast<int>(capacity) > (num_writers - num_readers));
  102. q.set_capacity(capacity);
  103. }
  104. // Start reader threads
  105. std::vector<S> storage(num_readers);
  106. std::vector<std::thread> readers;
  107. for (S& s : storage)
  108. {
  109. readers.emplace_back(reader, std::ref(q), std::ref(s));
  110. }
  111. // Start writer threads, also pre-generate reference numbers
  112. S reference;
  113. std::vector<std::thread> writers;
  114. for (int w = 0; w < num_writers; w++)
  115. {
  116. writers.emplace_back(writer, w*BASE, num_writes, std::ref(q));
  117. for (int r = 0; r < num_writes; r++)
  118. {
  119. reference.insert(w*BASE + r);
  120. }
  121. }
  122. // Every writer puts a STOP_SIGN at the end,
  123. // There are three cases:
  124. // 1) num_writers == num_readers
  125. // every reader should get its own STOP_SIGN from any
  126. // of the writers
  127. //
  128. // 2) num_writers > num_readers
  129. // every reader will get a STOP_SIGN but there're more
  130. // STOP_SIGNs may be pushed to the queue - and if this
  131. // number exceeds capacity, writers block (to a deadlock).
  132. // The latter situation must be avoided at parameters level.
  133. // [a] Also not every data produced by writers will be consumed
  134. // by a reader in this case. Master thread will read the rest
  135. //
  136. // 3) num_readers > num_writers
  137. // in this case, some readers will stuck and will never get
  138. // a STOP_SIGN. Master thread will push extra STOP_SIGNs to the
  139. // queue.
  140. // Solution to (2a)
  141. S remnants;
  142. if (num_writers > num_readers)
  143. {
  144. int extra = num_writers - num_readers;
  145. while (extra)
  146. {
  147. int x = 0;
  148. q.pop(x);
  149. if (x == STOP_SIGN) extra--;
  150. else remnants.insert(x);
  151. }
  152. }
  153. // Solution to (3)
  154. if (num_readers > num_writers)
  155. {
  156. int extra = num_readers - num_writers;
  157. while (extra--) q.push(STOP_SIGN);
  158. }
  159. // Wait for completions
  160. for (auto &t : readers) t.join();
  161. for (auto &t : writers) t.join();
  162. // Accumulate and validate the result
  163. S result(remnants.begin(), remnants.end());
  164. for (const auto &s : storage) result.insert(s.begin(), s.end());
  165. EXPECT_EQ(reference, result);
  166. }
  167. INSTANTIATE_TEST_CASE_P(ConcQueueStress, ConcQueue_,
  168. Combine( Values(1, 2, 4, 8, 16) // writers
  169. , Values(1, 32, 96, 256) // writes
  170. , Values(1, 2, 10) // readers
  171. , Values(0u, 16u, 32u))); // capacity
  172. } // namespace opencv_test