.. _program_listing_file_umap_Buffer.cpp: Program Listing for File Buffer.cpp =================================== |exhale_lsh| :ref:`Return to documentation for file ` (``umap/Buffer.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp ////////////////////////////////////////////////////////////////////////////// // Copyright 2017-2020 Lawrence Livermore National Security, LLC and other // UMAP Project Developers. See the top-level LICENSE file for details. // // SPDX-License-Identifier: LGPL-2.1-only ////////////////////////////////////////////////////////////////////////////// #include #include // for reading meminfo #include "umap/Buffer.hpp" #include "umap/config.h" #include "umap/FillWorkers.hpp" #include "umap/PageDescriptor.hpp" #include "umap/RegionManager.hpp" #include "umap/WorkerPool.hpp" #include "umap/util/Macros.hpp" namespace Umap { // // Called after data has been placed into the page // void Buffer::mark_page_as_present(PageDescriptor* pd) { lock(); pd->set_state_present(); if ( m_waits_for_state_change ) pthread_cond_broadcast( &m_state_change_cond ); unlock(); } // // Called after page has been flushed to store and page is no longer present // void Buffer::mark_page_as_free( PageDescriptor* pd ) { lock(); UMAP_LOG(Debug, "Removing page: " << pd); pd->region->erase_page_descriptor(pd); m_present_pages.erase(pd->page); pd->set_state_free(); pd->spurious_count = 0; // // We only put the page descriptor back onto the free list if it isn't // deferred. Note: It will be marked as deferred when the page is part of a // Region that has been unmapped. It will become undeferred later when the // eviction manager takes it off the end of the end of the buffer. // if ( ! pd->deferred ) release_page_descriptor(pd); if ( m_waits_for_state_change ) pthread_cond_broadcast( &m_state_change_cond ); pd->page = nullptr; unlock(); } void Buffer::release_page_descriptor( PageDescriptor* pd ) { m_free_pages.push_back(pd); if ( m_waits_for_avail_pd ) pthread_cond_broadcast(&m_avail_pd_cond); } // // Called from Evict Manager to begin eviction process on oldest present // page // PageDescriptor* Buffer::evict_oldest_page() { PageDescriptor* pd = nullptr; lock(); while ( m_busy_pages.size() != 0 ) { pd = m_busy_pages.back(); // Deferred means that this page was previously evicted as part of an // uunmap of a Region. This means that this page descriptor points to a // page that has already been given back to the system so all we need to // do is take it off of the busy list and release the descriptor. // if ( pd->deferred ) { UMAP_LOG(Debug, "Deferred Page: " << pd); // // Make sure that the page has truly been flushed. // wait_for_page_state(pd, PageDescriptor::State::FREE); m_busy_pages.pop_back(); m_stats.pages_deleted++; // // Jump to the next page descriptor // release_page_descriptor(pd); pd = nullptr; } else { UMAP_LOG(Debug, "Normal Page: " << pd); wait_for_page_state(pd, PageDescriptor::State::PRESENT); m_busy_pages.pop_back(); m_stats.pages_deleted++; pd->set_state_leaving(); break; } } unlock(); return pd; } // // Called from Evict Manager to begin eviction process on at most N (=32) // oldest present (non-deferred) pages without waiting for status change // std::vector Buffer::evict_oldest_pages() { std::vector evicted_pages; std::vector pending_pages; const int max_num_evicted_pages = 32; int num_evicted_pages = 0; lock(); size_t num_busy_pages = m_busy_pages.size(); if( num_busy_pages>0 ){ size_t i = num_busy_pages - 1; for(; (i>=0 && num_evicted_pagesdeferred && pd->state == PageDescriptor::State::PRESENT ){ m_stats.pages_deleted++; num_evicted_pages ++; pd->state = PageDescriptor::State::LEAVING; evicted_pages.push_back(pd); }else{ pending_pages.push_back(pd); } m_busy_pages.pop_back(); } size_t num_pending_pages = pending_pages.size(); for(size_t k=0; kdirty ) { PageDescriptor* pd = *it; UMAP_LOG(Debug, "schedule Dirty Page: " << pd); wait_for_page_state(pd, PageDescriptor::State::PRESENT); m_rm.get_evict_manager()->schedule_flush(pd); } } m_rm.get_evict_manager()->WaitAll(); unlock(); } // // Called from uunmap by the unmapping thread of the application // // The idea is to go through the entire buffer and remove (evict) all pages // of the given region descriptor. // void Buffer::evict_region(RegionDescriptor* rd) { if (m_rm.get_num_active_regions() > 1) { lock(); while ( rd->count() ) { auto pd = rd->get_next_page_descriptor(); if(pd->state != PageDescriptor::State::LEAVING ){ pd->deferred = true; wait_for_page_state(pd, PageDescriptor::State::PRESENT); pd->set_state_leaving(); m_rm.get_evict_manager()->schedule_eviction(pd); } wait_for_page_state(pd, PageDescriptor::State::FREE); } unlock(); } else { m_rm.get_evict_manager()->EvictAll(); } } bool Buffer::low_threshold_reached( void ) { return m_busy_pages.size() <= m_evict_low_water; } typedef struct FetchFuncParams { uint64_t psize; RegionDescriptor* rd; Uffd* m_uffd; uint64_t offset_st; uint64_t offset_end; } FetchFuncParams; void *FetchFunc(void *arg) { FetchFuncParams* params = (FetchFuncParams*) arg; uint64_t psize = params->psize; RegionDescriptor* rd = params->rd; Uffd* m_uffd = params->m_uffd; char* region_st = rd->start(); uint64_t offset_st = params->offset_st; uint64_t offset_end= params->offset_end; char* copyin_buf = (char*) malloc(psize); if ( !copyin_buf ) UMAP_ERROR("Failed to allocate copyin_buf"); for(uint64_t offset = offset_st; offset < offset_end; offset+=psize){ if( rd->store()->read_from_store(copyin_buf, psize, offset) == -1) UMAP_ERROR("failed to read_from_store at offset="<copy_in_page(copyin_buf, region_st + offset ); } free(copyin_buf); return NULL; } void Buffer::fetch_and_pin(char* paddr, uint64_t size) { lock(); auto rd = m_rm.containing_region(paddr); if ( rd == nullptr ) UMAP_ERROR("the prefetched region is not found"); /* cap the prefetched region */ char* pend = paddr + size; if( pend > rd->end() ){ pend = (char*) rd->end(); UMAP_LOG(Info, "the prefetched rergion is larger than the region (end at "<store_offset( paddr ); uint64_t offset_end = rd->store_offset( pend ); size = pend - paddr; /* Check free memory */ uint64_t mem_avail_kb = 0; unsigned long mem; std::string token; std::ifstream file("/proc/meminfo"); while (file >> token) { if (token == "MemAvailable:") { if (file >> mem) { mem_avail_kb = mem; } else { UMAP_ERROR("UMAP unable to determine system memory size\n"); } } // ignore rest of the line file.ignore(std::numeric_limits::max(), '\n'); } const uint64_t mem_margin_kb = 16777216; mem_avail_kb = (mem_avail_kb > mem_margin_kb) ?(mem_avail_kb-mem_margin_kb) : 0; uint64_t psize = m_rm.get_umap_page_size(); size_t num_free_pages = m_free_pages.size(); uint64_t free_page_mem = psize * num_free_pages; uint64_t mem_avail = (mem_avail_kb*1024/psize) * psize; UMAP_LOG(Info, " MemAvailable = " << mem << " Mem Usable = " << mem_avail << " fetch_and_pin = " << size << " free_page_mem = " << free_page_mem << " (" << num_free_pages <<" x " << psize <<" )"); /* Reduce the number of free pages if avail mem is insufficient */ if( ( free_page_mem + size) >= mem_avail ){ uint64_t reduced_mem = ( free_page_mem + size) - mem_avail; if( reduced_mem < free_page_mem){ size_t new_num_free_pages = (free_page_mem - reduced_mem)/psize; m_free_pages.resize(new_num_free_pages); m_size = m_busy_pages.size() + m_free_pages.size(); m_evict_low_water = apply_int_percentage(m_rm.get_evict_low_water_threshold(), m_size); m_evict_high_water = apply_int_percentage(m_rm.get_evict_high_water_threshold(), m_size); UMAP_LOG(Info, "Reduced Buffer Size to " << m_size ); }else{ /* TODO: evict current pages? */ UMAP_ERROR("Currently, no support for pinning a region larger than free pages\n"); } } /* get page alighed offset*/ Uffd* m_uffd = m_rm.get_uffd_h(); size_t num_pages = (offset_end - offset_st)/psize; size_t num_fetch_threads = (num_pages>1024) ?8 : 1; size_t stride = num_pages/num_fetch_threads*psize; time_t start = time(NULL); pthread_t fetchThreads[num_fetch_threads]; FetchFuncParams params[num_fetch_threads]; for(int i=0; idirty == false) { work.page_desc = pd; pd->dirty = true; pd->set_state_updating(); UMAP_LOG(Debug, "PRE: " << pd << " From: " << this); } else { static int hiwat = 0; pd->spurious_count++; if (pd->spurious_count > hiwat) { hiwat = pd->spurious_count; UMAP_LOG(Debug, "New Spurious cound high water mark: " << hiwat); } UMAP_LOG(Debug, "SPU: " << pd << " From: " << this); unlock(); return; } } else { // This page has not been brought in yet pd = get_page_descriptor(paddr, rd); pd->data_present = false; work.page_desc = pd; rd->insert_page_descriptor(pd); m_present_pages[pd->page] = pd; if (iswrite) pd->dirty = true; UMAP_LOG(Debug, "NEW: " << pd << " From: " << this); } m_rm.get_fill_workers_h()->send_work(work); // // Kick the eviction daemon if the high water mark has been reached // if ( m_busy_pages.size() == m_evict_high_water ) { WorkItem w; w.type = Umap::WorkItem::WorkType::THRESHOLD; w.page_desc = nullptr; m_rm.get_evict_manager()->send_work(w); } m_stats.events_processed ++; unlock(); } // Return nullptr if page not present, PageDescriptor * otherwise PageDescriptor* Buffer::page_already_present( char* page_addr ) { while (1) { auto pp = m_present_pages.find(page_addr); // // Most likely case // if ( pp == m_present_pages.end() ) return nullptr; // // Next most likely is that it is just present in the buffer // if ( pp->second->state == PageDescriptor::State::PRESENT ) return pp->second; // There is a chance that the state of this page is not/no-longer // PRESENT. If this is the case, we need to wait for it to finish // with whatever is happening to it and then check again // UMAP_LOG(Debug, "Waiting for state: (ANY)" << ", " << pp->second); ++m_stats.waits; ++m_waits_for_state_change; pthread_cond_wait(&m_state_change_cond, &m_mutex); --m_waits_for_state_change; } } PageDescriptor* Buffer::get_page_descriptor(char* vaddr, RegionDescriptor* rd) { while ( m_free_pages.size() == 0 ) { ++m_waits_for_avail_pd; m_stats.not_avail++; ++m_stats.waits; ++m_waits_for_state_change; pthread_cond_wait(&m_avail_pd_cond, &m_mutex); --m_waits_for_avail_pd; } PageDescriptor* rval; rval = m_free_pages.back(); m_free_pages.pop_back(); rval->page = vaddr; rval->region = rd; rval->dirty = false; rval->deferred = false; rval->set_state_filling(); rval->spurious_count = 0; m_stats.pages_inserted++; m_busy_pages.push_front(rval); return rval; } uint64_t Buffer::apply_int_percentage( int percentage, uint64_t item ) { uint64_t rval; if ( percentage < 0 || percentage > 100) UMAP_ERROR("Invalid percentage (" << percentage << ") given"); if ( percentage == 0 || percentage == 100 ) { rval = item; } else { float f = (float)((float)percentage / (float)100.0); rval = f * item; } return rval; } void Buffer::lock() { int err; if ( (err = pthread_mutex_trylock(&m_mutex)) != 0 ) { if (err != EBUSY) UMAP_ERROR("pthread_mutex_trylock failed: " << strerror(err)); if ( (err = pthread_mutex_lock(&m_mutex)) != 0 ) UMAP_ERROR("pthread_mutex_lock failed: " << strerror(err)); m_stats.lock_collision++; } m_stats.lock++; } void Buffer::unlock() { pthread_mutex_unlock(&m_mutex); } void Buffer::wait_for_page_state( PageDescriptor* pd, PageDescriptor::State st) { UMAP_LOG(Debug, "Waiting for state: " << st << ", " << pd); while ( pd->state != st ) { ++m_stats.waits; ++m_waits_for_state_change; pthread_cond_wait(&m_state_change_cond, &m_mutex); --m_waits_for_state_change; } } void Buffer::monitor(void) { const int monitor_interval = m_rm.get_monitor_freq(); UMAP_LOG(Info, "every " << monitor_interval << " seconds"); /* start the monitoring loop */ while( is_monitor_on ){ UMAP_LOG(Info, "m_size = " << m_size << ", num_busy_pages = " << m_busy_pages.size() << ", num_free_pages = " << m_free_pages.size() << ", events_processed = " << m_stats.events_processed ); sleep(monitor_interval); }//End of loop } Buffer::Buffer( void ) : m_rm(RegionManager::getInstance()) , m_size(m_rm.get_max_pages_in_buffer()) , m_waits_for_avail_pd(0) , m_waits_for_state_change(0) { m_array = (PageDescriptor *)calloc(m_size, sizeof(PageDescriptor)); if ( m_array == nullptr ) UMAP_ERROR("Failed to allocate " << m_size*sizeof(PageDescriptor) << " bytes for buffer page descriptors"); for ( int i = 0; i < m_size; ++i ) m_free_pages.push_back(&m_array[i]); pthread_mutex_init(&m_mutex, NULL); pthread_cond_init(&m_avail_pd_cond, NULL); pthread_cond_init(&m_state_change_cond, NULL); m_evict_low_water = apply_int_percentage(m_rm.get_evict_low_water_threshold(), m_size); m_evict_high_water = apply_int_percentage(m_rm.get_evict_high_water_threshold(), m_size); /* monitor page stats periodically */ if( m_rm.get_monitor_freq()>0 ){ is_monitor_on = true; int ret = pthread_create( &monitorThread, NULL, MonitorThreadEntryFunc, this); if (ret) { UMAP_ERROR("Failed to launch the monitor thread"); } }else{ is_monitor_on = false; } } Buffer::~Buffer( void ) { #ifdef UMAP_DISPLAY_STATS std::cout << m_stats << std::endl; #endif if( is_monitor_on ){ is_monitor_on = false; pthread_join( monitorThread , NULL ); } assert("Pages are still present" && m_present_pages.size() == 0); pthread_cond_destroy(&m_avail_pd_cond); pthread_cond_destroy(&m_state_change_cond); pthread_mutex_destroy(&m_mutex); free(m_array); } std::ostream& operator<<(std::ostream& os, const Umap::Buffer* b) { if ( b != nullptr ) { os << "{ m_size: " << b->m_size << ", m_waits_for_avail_pd: " << b->m_waits_for_avail_pd << ", m_present_pages.size(): " << std::setw(2) << b->m_present_pages.size() << ", m_free_pages.size(): " << std::setw(2) << b->m_free_pages.size() << ", m_busy_pages.size(): " << std::setw(2) << b->m_busy_pages.size() << " }" ; } else { os << "{ nullptr }"; } return os; } std::ostream& operator<<(std::ostream& os, const Umap::BufferStats& stats) { os << "Buffer Statisics:\n" << " Pages Inserted: " << std::setw(12) << stats.pages_inserted<< "\n" << " Pages Deleted: " << std::setw(12) << stats.pages_deleted<< "\n" << " Unavailable wait: " << std::setw(12) << stats.not_avail<< "\n" << " Locks: " << std::setw(12) << stats.lock << "\n" << " Lock collisions: " << std::setw(12) << stats.lock_collision << "\n" << " waits: " << std::setw(12) << stats.waits; return os; } } // end of namespace Umap