Program Listing for File WorkerPool.hpp

Return to documentation for file (umap/WorkerPool.hpp)

//////////////////////////////////////////////////////////////////////////////
// Copyright 2017-2020 Lawrence Livermore National Security, LLC and other
// UMAP Project Developers. See the top-level LICENSE file for details.
//
// SPDX-License-Identifier: LGPL-2.1-only
//////////////////////////////////////////////////////////////////////////////
#ifndef _UMAP_Pthread_HPP
#define _UMAP_Pthread_HPP

#include <cstdint>
#include <pthread.h>
#include <string>
#include <vector>

#include "umap/PageDescriptor.hpp"
#include "umap/WorkQueue.hpp"
#include "umap/util/Macros.hpp"

namespace Umap {
  struct WorkItem {
    enum WorkType { NONE, EXIT, THRESHOLD, EVICT, FAST_EVICT, FLUSH };
    PageDescriptor* page_desc;
    WorkType type;
  };

  static std::ostream& operator<<(std::ostream& os, const Umap::WorkItem& b)
  {
    os << "{ page_desc: " << b.page_desc;

    switch (b.type) {
      default: os << ", type: Unknown(" << b.type << ")"; break;
      case Umap::WorkItem::WorkType::NONE: os << ", type: " << "NONE"; break;
      case Umap::WorkItem::WorkType::EXIT: os << ", type: " << "EXIT"; break;
      case Umap::WorkItem::WorkType::THRESHOLD: os << ", type: " << "THRESHOLD"; break;
      case Umap::WorkItem::WorkType::EVICT: os << ", type: " << "EVICT"; break;
      case Umap::WorkItem::WorkType::FAST_EVICT: os << ", type: " << "FAST_EVICT"; break;
      case Umap::WorkItem::WorkType::FLUSH: os << ", type: " << "FLUSH"; break;
    }

    os << " }";
    return os;
  }

  class WorkerPool {
    public:
      WorkerPool(const std::string& pool_name, uint64_t num_threads)
        :   m_pool_name(pool_name)
          , m_num_threads(num_threads)
          , m_wq(new WorkQueue<WorkItem>(num_threads))
      {
        if (m_pool_name.length() > 15)
          m_pool_name.resize(15);
      }

      virtual ~WorkerPool() {
        stop_thread_pool();
        delete m_wq;
      }

      void send_work(const WorkItem& work) {
        m_wq->enqueue(work);
      }

      WorkItem get_work() {
        return m_wq->dequeue();
      }

      bool wq_is_empty( void ) {
        return m_wq->is_empty();
      }

      void start_thread_pool() {
        UMAP_LOG(Debug, "Starting " <<  m_pool_name << " Pool of "
            << m_num_threads << " threads");

        for ( uint64_t i = 0; i < m_num_threads; ++i) {
          pthread_t t;

          if (pthread_create(&t, NULL, ThreadEntryFunc, this) != 0)
            UMAP_ERROR("Failed to launch thread");

          if (pthread_setname_np(t, m_pool_name.c_str()) != 0)
            UMAP_ERROR("Failed to set thread name");

          m_threads.push_back(t);
        }
      }

      void stop_thread_pool() {
        UMAP_LOG(Debug, "Stopping " <<  m_pool_name << " Pool of "
            << m_num_threads << " threads");

        WorkItem w = {.page_desc = nullptr, .type = Umap::WorkItem::WorkType::EXIT };

        //
        // This will inform all of the threads it is time to go away
        //
        for ( uint64_t i = 0; i < m_num_threads; ++i)
          send_work(w);

        //
        // Wait for all of the threads to exit
        //
        for ( auto pt : m_threads )
          (void) pthread_join(pt, NULL);

        m_threads.clear();

        UMAP_LOG(Debug, m_pool_name << " stopped");
      }

      void wait_for_idle( void ) {
        m_wq->wait_for_idle();
      }

    protected:
      virtual void ThreadEntry() = 0;

    private:
      static void* ThreadEntryFunc(void * This) {
        ((WorkerPool *)This)->ThreadEntry();
        return NULL;
      }

      std::string             m_pool_name;
      uint64_t                m_num_threads;
      WorkQueue<WorkItem>*    m_wq;
      std::vector<pthread_t>  m_threads;
  };
} // end of namespace Umap
#endif // _UMAP_WorkerPool_HPP