123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- // This file is part of OpenCV project.
- // It is subject to the license terms in the LICENSE file found in the top-level directory
- // of this distribution and at http://opencv.org/license.html.
- //
- // Copyright (C) 2019 Intel Corporation
- #include "../test_precomp.hpp"
- #include <unordered_set>
- #include <thread>
- #include "executor/conc_queue.hpp"
- namespace opencv_test
- {
- using namespace cv::gapi;
- TEST(ConcQueue, PushPop)
- {
- own::concurrent_bounded_queue<int> q;
- for (int i = 0; i < 100; i++)
- {
- q.push(i);
- }
- for (int i = 0; i < 100; i++)
- {
- int x;
- q.pop(x);
- EXPECT_EQ(i, x);
- }
- }
- TEST(ConcQueue, TryPop)
- {
- own::concurrent_bounded_queue<int> q;
- int x = 0;
- EXPECT_FALSE(q.try_pop(x));
- q.push(1);
- EXPECT_TRUE(q.try_pop(x));
- EXPECT_EQ(1, x);
- }
- TEST(ConcQueue, Clear)
- {
- own::concurrent_bounded_queue<int> q;
- for (int i = 0; i < 10; i++)
- {
- q.push(i);
- }
- q.clear();
- int x = 0;
- EXPECT_FALSE(q.try_pop(x));
- }
- // In this test, every writer thread produces its own range of integer
- // numbers, writing those to a shared queue.
- //
- // Every reader thread pops elements from the queue (until -1 is
- // reached) and stores those in its own associated set.
- //
- // Finally, the master thread waits for completion of all other
- // threads and verifies that all the necessary data is
- // produced/obtained.
- namespace
- {
- using StressParam = std::tuple<int // Num writer threads
- ,int // Num elements per writer
- ,int // Num reader threads
- ,std::size_t>; // Queue capacity
- constexpr int STOP_SIGN = -1;
- constexpr int BASE = 1000;
- }
- struct ConcQueue_: public ::testing::TestWithParam<StressParam>
- {
- using Q = own::concurrent_bounded_queue<int>;
- using S = std::unordered_set<int>;
- static void writer(int base, int writes, Q& q)
- {
- for (int i = 0; i < writes; i++)
- {
- q.push(base + i);
- }
- q.push(STOP_SIGN);
- }
- static void reader(Q& q, S& s)
- {
- int x = 0;
- while (true)
- {
- q.pop(x);
- if (x == STOP_SIGN) return;
- s.insert(x);
- }
- }
- };
- TEST_P(ConcQueue_, Test)
- {
- int num_writers = 0;
- int num_writes = 0;
- int num_readers = 0;
- std::size_t capacity = 0u;
- std::tie(num_writers, num_writes, num_readers, capacity) = GetParam();
- CV_Assert(num_writers < 20);
- CV_Assert(num_writes < BASE);
- Q q;
- if (capacity)
- {
- // see below (2)
- CV_Assert(static_cast<int>(capacity) > (num_writers - num_readers));
- q.set_capacity(capacity);
- }
- // Start reader threads
- std::vector<S> storage(num_readers);
- std::vector<std::thread> readers;
- for (S& s : storage)
- {
- readers.emplace_back(reader, std::ref(q), std::ref(s));
- }
- // Start writer threads, also pre-generate reference numbers
- S reference;
- std::vector<std::thread> writers;
- for (int w = 0; w < num_writers; w++)
- {
- writers.emplace_back(writer, w*BASE, num_writes, std::ref(q));
- for (int r = 0; r < num_writes; r++)
- {
- reference.insert(w*BASE + r);
- }
- }
- // Every writer puts a STOP_SIGN at the end,
- // There are three cases:
- // 1) num_writers == num_readers
- // every reader should get its own STOP_SIGN from any
- // of the writers
- //
- // 2) num_writers > num_readers
- // every reader will get a STOP_SIGN but there're more
- // STOP_SIGNs may be pushed to the queue - and if this
- // number exceeds capacity, writers block (to a deadlock).
- // The latter situation must be avoided at parameters level.
- // [a] Also not every data produced by writers will be consumed
- // by a reader in this case. Master thread will read the rest
- //
- // 3) num_readers > num_writers
- // in this case, some readers will stuck and will never get
- // a STOP_SIGN. Master thread will push extra STOP_SIGNs to the
- // queue.
- // Solution to (2a)
- S remnants;
- if (num_writers > num_readers)
- {
- int extra = num_writers - num_readers;
- while (extra)
- {
- int x = 0;
- q.pop(x);
- if (x == STOP_SIGN) extra--;
- else remnants.insert(x);
- }
- }
- // Solution to (3)
- if (num_readers > num_writers)
- {
- int extra = num_readers - num_writers;
- while (extra--) q.push(STOP_SIGN);
- }
- // Wait for completions
- for (auto &t : readers) t.join();
- for (auto &t : writers) t.join();
- // Accumulate and validate the result
- S result(remnants.begin(), remnants.end());
- for (const auto &s : storage) result.insert(s.begin(), s.end());
- EXPECT_EQ(reference, result);
- }
- INSTANTIATE_TEST_CASE_P(ConcQueueStress, ConcQueue_,
- Combine( Values(1, 2, 4, 8, 16) // writers
- , Values(1, 32, 96, 256) // writes
- , Values(1, 2, 10) // readers
- , Values(0u, 16u, 32u))); // capacity
- } // namespace opencv_test
|