.. _program_listing_file_umap_WorkerPool.hpp: Program Listing for File WorkerPool.hpp ======================================= |exhale_lsh| :ref:`Return to documentation for file ` (``umap/WorkerPool.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp ////////////////////////////////////////////////////////////////////////////// // Copyright 2017-2020 Lawrence Livermore National Security, LLC and other // UMAP Project Developers. See the top-level LICENSE file for details. // // SPDX-License-Identifier: LGPL-2.1-only ////////////////////////////////////////////////////////////////////////////// #ifndef _UMAP_Pthread_HPP #define _UMAP_Pthread_HPP #include #include #include #include #include "umap/PageDescriptor.hpp" #include "umap/WorkQueue.hpp" #include "umap/util/Macros.hpp" namespace Umap { struct WorkItem { enum WorkType { NONE, EXIT, THRESHOLD, EVICT, FAST_EVICT, FLUSH }; PageDescriptor* page_desc; WorkType type; }; static std::ostream& operator<<(std::ostream& os, const Umap::WorkItem& b) { os << "{ page_desc: " << b.page_desc; switch (b.type) { default: os << ", type: Unknown(" << b.type << ")"; break; case Umap::WorkItem::WorkType::NONE: os << ", type: " << "NONE"; break; case Umap::WorkItem::WorkType::EXIT: os << ", type: " << "EXIT"; break; case Umap::WorkItem::WorkType::THRESHOLD: os << ", type: " << "THRESHOLD"; break; case Umap::WorkItem::WorkType::EVICT: os << ", type: " << "EVICT"; break; case Umap::WorkItem::WorkType::FAST_EVICT: os << ", type: " << "FAST_EVICT"; break; case Umap::WorkItem::WorkType::FLUSH: os << ", type: " << "FLUSH"; break; } os << " }"; return os; } class WorkerPool { public: WorkerPool(const std::string& pool_name, uint64_t num_threads) : m_pool_name(pool_name) , m_num_threads(num_threads) , m_wq(new WorkQueue(num_threads)) { if (m_pool_name.length() > 15) m_pool_name.resize(15); } virtual ~WorkerPool() { stop_thread_pool(); delete m_wq; } void send_work(const WorkItem& work) { m_wq->enqueue(work); } WorkItem get_work() { return m_wq->dequeue(); } bool wq_is_empty( void ) { return m_wq->is_empty(); } void start_thread_pool() { UMAP_LOG(Debug, "Starting " << m_pool_name << " Pool of " << m_num_threads << " threads"); for ( uint64_t i = 0; i < m_num_threads; ++i) { pthread_t t; if (pthread_create(&t, NULL, ThreadEntryFunc, this) != 0) UMAP_ERROR("Failed to launch thread"); if (pthread_setname_np(t, m_pool_name.c_str()) != 0) UMAP_ERROR("Failed to set thread name"); m_threads.push_back(t); } } void stop_thread_pool() { UMAP_LOG(Debug, "Stopping " << m_pool_name << " Pool of " << m_num_threads << " threads"); WorkItem w = {.page_desc = nullptr, .type = Umap::WorkItem::WorkType::EXIT }; // // This will inform all of the threads it is time to go away // for ( uint64_t i = 0; i < m_num_threads; ++i) send_work(w); // // Wait for all of the threads to exit // for ( auto pt : m_threads ) (void) pthread_join(pt, NULL); m_threads.clear(); UMAP_LOG(Debug, m_pool_name << " stopped"); } void wait_for_idle( void ) { m_wq->wait_for_idle(); } protected: virtual void ThreadEntry() = 0; private: static void* ThreadEntryFunc(void * This) { ((WorkerPool *)This)->ThreadEntry(); return NULL; } std::string m_pool_name; uint64_t m_num_threads; WorkQueue* m_wq; std::vector m_threads; }; } // end of namespace Umap #endif // _UMAP_WorkerPool_HPP