Program Listing for File Buffer.cpp¶
↰ Return to documentation for file (umap/Buffer.cpp)
//////////////////////////////////////////////////////////////////////////////
// Copyright 2017-2020 Lawrence Livermore National Security, LLC and other
// UMAP Project Developers. See the top-level LICENSE file for details.
//
// SPDX-License-Identifier: LGPL-2.1-only
//////////////////////////////////////////////////////////////////////////////
#include <pthread.h>
#include <fstream> // for reading meminfo
#include "umap/Buffer.hpp"
#include "umap/config.h"
#include "umap/FillWorkers.hpp"
#include "umap/PageDescriptor.hpp"
#include "umap/RegionManager.hpp"
#include "umap/WorkerPool.hpp"
#include "umap/util/Macros.hpp"
namespace Umap {
//
// Called after data has been placed into the page
//
void Buffer::mark_page_as_present(PageDescriptor* pd)
{
lock();
pd->set_state_present();
if ( m_waits_for_state_change )
pthread_cond_broadcast( &m_state_change_cond );
unlock();
}
//
// Called after page has been flushed to store and page is no longer present
//
void Buffer::mark_page_as_free( PageDescriptor* pd )
{
lock();
UMAP_LOG(Debug, "Removing page: " << pd);
pd->region->erase_page_descriptor(pd);
m_present_pages.erase(pd->page);
pd->set_state_free();
pd->spurious_count = 0;
//
// We only put the page descriptor back onto the free list if it isn't
// deferred. Note: It will be marked as deferred when the page is part of a
// Region that has been unmapped. It will become undeferred later when the
// eviction manager takes it off the end of the end of the buffer.
//
if ( ! pd->deferred )
release_page_descriptor(pd);
if ( m_waits_for_state_change )
pthread_cond_broadcast( &m_state_change_cond );
pd->page = nullptr;
unlock();
}
void Buffer::release_page_descriptor( PageDescriptor* pd )
{
m_free_pages.push_back(pd);
if ( m_waits_for_avail_pd )
pthread_cond_broadcast(&m_avail_pd_cond);
}
//
// Called from Evict Manager to begin eviction process on oldest present
// page
//
PageDescriptor* Buffer::evict_oldest_page()
{
PageDescriptor* pd = nullptr;
lock();
while ( m_busy_pages.size() != 0 ) {
pd = m_busy_pages.back();
// Deferred means that this page was previously evicted as part of an
// uunmap of a Region. This means that this page descriptor points to a
// page that has already been given back to the system so all we need to
// do is take it off of the busy list and release the descriptor.
//
if ( pd->deferred ) {
UMAP_LOG(Debug, "Deferred Page: " << pd);
//
// Make sure that the page has truly been flushed.
//
wait_for_page_state(pd, PageDescriptor::State::FREE);
m_busy_pages.pop_back();
m_stats.pages_deleted++;
//
// Jump to the next page descriptor
//
release_page_descriptor(pd);
pd = nullptr;
}
else {
UMAP_LOG(Debug, "Normal Page: " << pd);
wait_for_page_state(pd, PageDescriptor::State::PRESENT);
m_busy_pages.pop_back();
m_stats.pages_deleted++;
pd->set_state_leaving();
break;
}
}
unlock();
return pd;
}
//
// Called from Evict Manager to begin eviction process on at most N (=32)
// oldest present (non-deferred) pages without waiting for status change
//
std::vector<PageDescriptor*> Buffer::evict_oldest_pages()
{
std::vector<PageDescriptor*> evicted_pages;
std::vector<PageDescriptor*> pending_pages;
const int max_num_evicted_pages = 32;
int num_evicted_pages = 0;
lock();
size_t num_busy_pages = m_busy_pages.size();
if( num_busy_pages>0 ){
size_t i = num_busy_pages - 1;
for(; (i>=0 && num_evicted_pages<max_num_evicted_pages); i--){
PageDescriptor* pd = m_busy_pages[i];
if( !pd->deferred && pd->state == PageDescriptor::State::PRESENT ){
m_stats.pages_deleted++;
num_evicted_pages ++;
pd->state = PageDescriptor::State::LEAVING;
evicted_pages.push_back(pd);
}else{
pending_pages.push_back(pd);
}
m_busy_pages.pop_back();
}
size_t num_pending_pages = pending_pages.size();
for(size_t k=0; k<num_pending_pages; k++){
m_busy_pages.push_back(pending_pages[k]);
}
}
unlock();
return evicted_pages;
}
void Buffer::flush_dirty_pages()
{
lock();
for (auto it = m_busy_pages.begin(); it != m_busy_pages.end(); it++) {
if ( (*it)->dirty ) {
PageDescriptor* pd = *it;
UMAP_LOG(Debug, "schedule Dirty Page: " << pd);
wait_for_page_state(pd, PageDescriptor::State::PRESENT);
m_rm.get_evict_manager()->schedule_flush(pd);
}
}
m_rm.get_evict_manager()->WaitAll();
unlock();
}
//
// Called from uunmap by the unmapping thread of the application
//
// The idea is to go through the entire buffer and remove (evict) all pages
// of the given region descriptor.
//
void Buffer::evict_region(RegionDescriptor* rd)
{
if (m_rm.get_num_active_regions() > 1) {
lock();
while ( rd->count() ) {
auto pd = rd->get_next_page_descriptor();
if(pd->state != PageDescriptor::State::LEAVING ){
pd->deferred = true;
wait_for_page_state(pd, PageDescriptor::State::PRESENT);
pd->set_state_leaving();
m_rm.get_evict_manager()->schedule_eviction(pd);
}
wait_for_page_state(pd, PageDescriptor::State::FREE);
}
unlock();
}
else {
m_rm.get_evict_manager()->EvictAll();
}
}
bool Buffer::low_threshold_reached( void )
{
return m_busy_pages.size() <= m_evict_low_water;
}
typedef struct FetchFuncParams {
uint64_t psize;
RegionDescriptor* rd;
Uffd* m_uffd;
uint64_t offset_st;
uint64_t offset_end;
} FetchFuncParams;
void *FetchFunc(void *arg)
{
FetchFuncParams* params = (FetchFuncParams*) arg;
uint64_t psize = params->psize;
RegionDescriptor* rd = params->rd;
Uffd* m_uffd = params->m_uffd;
char* region_st = rd->start();
uint64_t offset_st = params->offset_st;
uint64_t offset_end= params->offset_end;
char* copyin_buf = (char*) malloc(psize);
if ( !copyin_buf )
UMAP_ERROR("Failed to allocate copyin_buf");
for(uint64_t offset = offset_st; offset < offset_end; offset+=psize){
if( rd->store()->read_from_store(copyin_buf, psize, offset) == -1)
UMAP_ERROR("failed to read_from_store at offset="<<offset);
m_uffd->copy_in_page(copyin_buf, region_st + offset );
}
free(copyin_buf);
return NULL;
}
void Buffer::fetch_and_pin(char* paddr, uint64_t size)
{
lock();
auto rd = m_rm.containing_region(paddr);
if ( rd == nullptr )
UMAP_ERROR("the prefetched region is not found");
/* cap the prefetched region */
char* pend = paddr + size;
if( pend > rd->end() ){
pend = (char*) rd->end();
UMAP_LOG(Info, "the prefetched rergion is larger than the region (end at "<<pend<<")");
}
/* get aligned fetch size */
uint64_t offset_st = rd->store_offset( paddr );
uint64_t offset_end = rd->store_offset( pend );
size = pend - paddr;
/* Check free memory */
uint64_t mem_avail_kb = 0;
unsigned long mem;
std::string token;
std::ifstream file("/proc/meminfo");
while (file >> token) {
if (token == "MemAvailable:") {
if (file >> mem) {
mem_avail_kb = mem;
} else {
UMAP_ERROR("UMAP unable to determine system memory size\n");
}
}
// ignore rest of the line
file.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}
const uint64_t mem_margin_kb = 16777216;
mem_avail_kb = (mem_avail_kb > mem_margin_kb) ?(mem_avail_kb-mem_margin_kb) : 0;
uint64_t psize = m_rm.get_umap_page_size();
size_t num_free_pages = m_free_pages.size();
uint64_t free_page_mem = psize * num_free_pages;
uint64_t mem_avail = (mem_avail_kb*1024/psize) * psize;
UMAP_LOG(Info, " MemAvailable = " << mem
<< " Mem Usable = " << mem_avail
<< " fetch_and_pin = " << size
<< " free_page_mem = " << free_page_mem << " (" << num_free_pages <<" x " << psize <<" )");
/* Reduce the number of free pages if avail mem is insufficient */
if( ( free_page_mem + size) >= mem_avail ){
uint64_t reduced_mem = ( free_page_mem + size) - mem_avail;
if( reduced_mem < free_page_mem){
size_t new_num_free_pages = (free_page_mem - reduced_mem)/psize;
m_free_pages.resize(new_num_free_pages);
m_size = m_busy_pages.size() + m_free_pages.size();
m_evict_low_water = apply_int_percentage(m_rm.get_evict_low_water_threshold(), m_size);
m_evict_high_water = apply_int_percentage(m_rm.get_evict_high_water_threshold(), m_size);
UMAP_LOG(Info, "Reduced Buffer Size to " << m_size );
}else{
/* TODO: evict current pages? */
UMAP_ERROR("Currently, no support for pinning a region larger than free pages\n");
}
}
/* get page alighed offset*/
Uffd* m_uffd = m_rm.get_uffd_h();
size_t num_pages = (offset_end - offset_st)/psize;
size_t num_fetch_threads = (num_pages>1024) ?8 : 1;
size_t stride = num_pages/num_fetch_threads*psize;
time_t start = time(NULL);
pthread_t fetchThreads[num_fetch_threads];
FetchFuncParams params[num_fetch_threads];
for(int i=0; i<num_fetch_threads; i++){
params[i].psize = psize;
params[i].rd = rd;
params[i].m_uffd = m_uffd;
params[i].offset_st = offset_st + stride*i;
params[i].offset_end = params[i].offset_st + stride;
if(i==(num_fetch_threads-1))
params[i].offset_end = offset_end;
UMAP_LOG(Info, "FetchThread "<<i<<" ["<<params[i].offset_st<<" , "<<params[i].offset_end<<"]");
int ret = pthread_create(&fetchThreads[i], NULL, FetchFunc, ¶ms[i]);
if (ret) {
UMAP_ERROR("Failed to launch fetchthread "<<i );
}
}
for(int i=0; i<num_fetch_threads; i++)
pthread_join(fetchThreads[i], NULL);
time_t end = time(NULL);
UMAP_LOG(Info,"Fetch_and_pin: "<< (end-start) << " seconds");
unlock();
}
void Buffer::process_page_event(char* paddr, bool iswrite, RegionDescriptor* rd)
{
WorkItem work;
work.type = Umap::WorkItem::WorkType::NONE;
lock();
auto pd = page_already_present(paddr);
if ( pd != nullptr ) { // Page is already present
if (iswrite && pd->dirty == false) {
work.page_desc = pd;
pd->dirty = true;
pd->set_state_updating();
UMAP_LOG(Debug, "PRE: " << pd << " From: " << this);
}
else {
static int hiwat = 0;
pd->spurious_count++;
if (pd->spurious_count > hiwat) {
hiwat = pd->spurious_count;
UMAP_LOG(Debug, "New Spurious cound high water mark: " << hiwat);
}
UMAP_LOG(Debug, "SPU: " << pd << " From: " << this);
unlock();
return;
}
}
else { // This page has not been brought in yet
pd = get_page_descriptor(paddr, rd);
pd->data_present = false;
work.page_desc = pd;
rd->insert_page_descriptor(pd);
m_present_pages[pd->page] = pd;
if (iswrite)
pd->dirty = true;
UMAP_LOG(Debug, "NEW: " << pd << " From: " << this);
}
m_rm.get_fill_workers_h()->send_work(work);
//
// Kick the eviction daemon if the high water mark has been reached
//
if ( m_busy_pages.size() == m_evict_high_water ) {
WorkItem w;
w.type = Umap::WorkItem::WorkType::THRESHOLD;
w.page_desc = nullptr;
m_rm.get_evict_manager()->send_work(w);
}
m_stats.events_processed ++;
unlock();
}
// Return nullptr if page not present, PageDescriptor * otherwise
PageDescriptor* Buffer::page_already_present( char* page_addr )
{
while (1) {
auto pp = m_present_pages.find(page_addr);
//
// Most likely case
//
if ( pp == m_present_pages.end() )
return nullptr;
//
// Next most likely is that it is just present in the buffer
//
if ( pp->second->state == PageDescriptor::State::PRESENT )
return pp->second;
// There is a chance that the state of this page is not/no-longer
// PRESENT. If this is the case, we need to wait for it to finish
// with whatever is happening to it and then check again
//
UMAP_LOG(Debug, "Waiting for state: (ANY)" << ", " << pp->second);
++m_stats.waits;
++m_waits_for_state_change;
pthread_cond_wait(&m_state_change_cond, &m_mutex);
--m_waits_for_state_change;
}
}
PageDescriptor* Buffer::get_page_descriptor(char* vaddr, RegionDescriptor* rd)
{
while ( m_free_pages.size() == 0 ) {
++m_waits_for_avail_pd;
m_stats.not_avail++;
++m_stats.waits;
++m_waits_for_state_change;
pthread_cond_wait(&m_avail_pd_cond, &m_mutex);
--m_waits_for_avail_pd;
}
PageDescriptor* rval;
rval = m_free_pages.back();
m_free_pages.pop_back();
rval->page = vaddr;
rval->region = rd;
rval->dirty = false;
rval->deferred = false;
rval->set_state_filling();
rval->spurious_count = 0;
m_stats.pages_inserted++;
m_busy_pages.push_front(rval);
return rval;
}
uint64_t Buffer::apply_int_percentage( int percentage, uint64_t item )
{
uint64_t rval;
if ( percentage < 0 || percentage > 100)
UMAP_ERROR("Invalid percentage (" << percentage << ") given");
if ( percentage == 0 || percentage == 100 ) {
rval = item;
}
else {
float f = (float)((float)percentage / (float)100.0);
rval = f * item;
}
return rval;
}
void Buffer::lock()
{
int err;
if ( (err = pthread_mutex_trylock(&m_mutex)) != 0 ) {
if (err != EBUSY)
UMAP_ERROR("pthread_mutex_trylock failed: " << strerror(err));
if ( (err = pthread_mutex_lock(&m_mutex)) != 0 )
UMAP_ERROR("pthread_mutex_lock failed: " << strerror(err));
m_stats.lock_collision++;
}
m_stats.lock++;
}
void Buffer::unlock()
{
pthread_mutex_unlock(&m_mutex);
}
void Buffer::wait_for_page_state( PageDescriptor* pd, PageDescriptor::State st)
{
UMAP_LOG(Debug, "Waiting for state: " << st << ", " << pd);
while ( pd->state != st ) {
++m_stats.waits;
++m_waits_for_state_change;
pthread_cond_wait(&m_state_change_cond, &m_mutex);
--m_waits_for_state_change;
}
}
void Buffer::monitor(void)
{
const int monitor_interval = m_rm.get_monitor_freq();
UMAP_LOG(Info, "every " << monitor_interval << " seconds");
/* start the monitoring loop */
while( is_monitor_on ){
UMAP_LOG(Info, "m_size = " << m_size
<< ", num_busy_pages = " << m_busy_pages.size()
<< ", num_free_pages = " << m_free_pages.size()
<< ", events_processed = " << m_stats.events_processed );
sleep(monitor_interval);
}//End of loop
}
Buffer::Buffer( void )
: m_rm(RegionManager::getInstance())
, m_size(m_rm.get_max_pages_in_buffer())
, m_waits_for_avail_pd(0)
, m_waits_for_state_change(0)
{
m_array = (PageDescriptor *)calloc(m_size, sizeof(PageDescriptor));
if ( m_array == nullptr )
UMAP_ERROR("Failed to allocate " << m_size*sizeof(PageDescriptor)
<< " bytes for buffer page descriptors");
for ( int i = 0; i < m_size; ++i )
m_free_pages.push_back(&m_array[i]);
pthread_mutex_init(&m_mutex, NULL);
pthread_cond_init(&m_avail_pd_cond, NULL);
pthread_cond_init(&m_state_change_cond, NULL);
m_evict_low_water = apply_int_percentage(m_rm.get_evict_low_water_threshold(), m_size);
m_evict_high_water = apply_int_percentage(m_rm.get_evict_high_water_threshold(), m_size);
/* monitor page stats periodically */
if( m_rm.get_monitor_freq()>0 ){
is_monitor_on = true;
int ret = pthread_create( &monitorThread, NULL, MonitorThreadEntryFunc, this);
if (ret) {
UMAP_ERROR("Failed to launch the monitor thread");
}
}else{
is_monitor_on = false;
}
}
Buffer::~Buffer( void ) {
#ifdef UMAP_DISPLAY_STATS
std::cout << m_stats << std::endl;
#endif
if( is_monitor_on ){
is_monitor_on = false;
pthread_join( monitorThread , NULL );
}
assert("Pages are still present" && m_present_pages.size() == 0);
pthread_cond_destroy(&m_avail_pd_cond);
pthread_cond_destroy(&m_state_change_cond);
pthread_mutex_destroy(&m_mutex);
free(m_array);
}
std::ostream& operator<<(std::ostream& os, const Umap::Buffer* b)
{
if ( b != nullptr ) {
os << "{ m_size: " << b->m_size
<< ", m_waits_for_avail_pd: " << b->m_waits_for_avail_pd
<< ", m_present_pages.size(): " << std::setw(2) << b->m_present_pages.size()
<< ", m_free_pages.size(): " << std::setw(2) << b->m_free_pages.size()
<< ", m_busy_pages.size(): " << std::setw(2) << b->m_busy_pages.size()
<< " }"
;
}
else {
os << "{ nullptr }";
}
return os;
}
std::ostream& operator<<(std::ostream& os, const Umap::BufferStats& stats)
{
os << "Buffer Statisics:\n"
<< " Pages Inserted: " << std::setw(12) << stats.pages_inserted<< "\n"
<< " Pages Deleted: " << std::setw(12) << stats.pages_deleted<< "\n"
<< " Unavailable wait: " << std::setw(12) << stats.not_avail<< "\n"
<< " Locks: " << std::setw(12) << stats.lock << "\n"
<< " Lock collisions: " << std::setw(12) << stats.lock_collision << "\n"
<< " waits: " << std::setw(12) << stats.waits;
return os;
}
} // end of namespace Umap