diff --git a/include/boost/interprocess/detail/workaround.hpp b/include/boost/interprocess/detail/workaround.hpp index 5089696f..a61f8527 100644 --- a/include/boost/interprocess/detail/workaround.hpp +++ b/include/boost/interprocess/detail/workaround.hpp @@ -180,7 +180,9 @@ //message queue uses a circular queue as index instead of an array (better performance) //Boost version < 1.52 uses an array, so undef this if you want to communicate //with processes compiled with those versions. -#define BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX +#ifndef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX +#define BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX 1 +#endif //Macros for documentation purposes. For code, expands to the argument #define BOOST_INTERPROCESS_IMPDEF(TYPE) TYPE diff --git a/include/boost/interprocess/ipc/message_queue.hpp b/include/boost/interprocess/ipc/message_queue.hpp index db422cc9..27045427 100644 --- a/include/boost/interprocess/ipc/message_queue.hpp +++ b/include/boost/interprocess/ipc/message_queue.hpp @@ -42,6 +42,13 @@ #include //std::size_t #include //memcpy +#ifndef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX +#define BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX 1 +#endif + +#ifndef BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX +#define BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX 1 +#endif //!\file //!Describes an inter-process message queue. This class allows sending @@ -181,6 +188,18 @@ class message_queue_t friend class ipcdetail::msg_queue_initialization_func_t; + //!Allows the waiter_count to be properly decremented in the event of an + //!exception received during a blocking or timed wait. + class scoped_waiter_count { + size_type &m_waiter_count; + public: + scoped_waiter_count(size_type &waiter_count): + m_waiter_count(waiter_count) { + ++m_waiter_count; + } + ~scoped_waiter_count(){ --m_waiter_count; } + }; + bool do_receive(block_t block, void *buffer, size_type buffer_size, size_type &recvd_size, unsigned int &priority, @@ -190,6 +209,13 @@ class message_queue_t const void *buffer, size_type buffer_size, unsigned int priority, const ptime &abs_time); + //!Generalized wait method. Accepts a lock, a condition, a state variable + //!which can evaluate to a bool, and a counter which keeps track of the + //!number of waiters on this condition. Accepts non-blocking, blocking, and + //!timed wait parameters. + template + bool do_wait(LockType &locked_lock, interprocess_condition &condition, const State &desired_state, size_type &waiter_count, block_t block, const boost::posix_time::ptime &abs_time); + //!Returns the needed memory size for the shared message queue. //!Never throws static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg); @@ -312,13 +338,30 @@ class mq_hdr_t : m_max_num_msg(max_num_msg), m_max_msg_size(max_msg_size), m_cur_num_msg(0) - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + #if BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX ,m_cur_first_msg(0u) + #endif ,m_blocked_senders(0u) ,m_blocked_receivers(0u) + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + ,m_free_waiter_count(0) + ,m_free_stack_top(max_num_msg) #endif { this->initialize_memory(); } + //!Boolean state variable which when true indicates that the queue is no + //!longer full. + class not_full_state { + mq_hdr_t &m_p_hdr; + public: + not_full_state(mq_hdr_t &p_hdr): + m_p_hdr(p_hdr) { + } + operator bool() const { + return !m_p_hdr.is_full(); + } + }; + //!Returns true if the message queue is full bool is_full() const { return m_cur_num_msg == m_max_num_msg; } @@ -331,7 +374,7 @@ class mq_hdr_t void free_top_msg() { --m_cur_num_msg; } - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) + #if BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX typedef msg_hdr_ptr_t *iterator; @@ -343,10 +386,15 @@ class mq_hdr_t } //!Returns the inserted message with top priority - msg_header &top_msg() + msg_hdr_ptr_t top_msg_ptr() { size_type pos = this->end_pos(); - return *mp_index[pos ? --pos : m_max_num_msg - 1]; + return mp_index[pos ? --pos : m_max_num_msg - 1]; + } + + msg_header &top_msg() + { + return *top_msg_ptr(); } //!Returns the inserted message with bottom priority @@ -382,8 +430,9 @@ class mq_hdr_t } } - msg_header & insert_at(iterator where) + iterator insert_at(iterator where) { + iterator result; iterator it_inserted_ptr_end = this->inserted_ptr_end(); iterator it_inserted_ptr_beg = this->inserted_ptr_begin(); if(where == it_inserted_ptr_beg){ @@ -391,11 +440,11 @@ class mq_hdr_t m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; --m_cur_first_msg; ++m_cur_num_msg; - return *mp_index[m_cur_first_msg]; + result = &mp_index[m_cur_first_msg]; } else if(where == it_inserted_ptr_end){ ++m_cur_num_msg; - return **it_inserted_ptr_end; + result = it_inserted_ptr_end; } else{ size_type pos = where - &mp_index[0]; @@ -416,7 +465,9 @@ class mq_hdr_t const size_type first_segment_end = pos; const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg; const size_type second_segment_end = m_max_num_msg; + #if !BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1); + #endif //First segment if(!unique_segment){ @@ -428,11 +479,13 @@ class mq_hdr_t std::copy( &mp_index[0] + first_segment_beg , &mp_index[0] + first_segment_end , &mp_index[0] + first_segment_beg - 1); + #if !BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX *where = backup; + #endif m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg; --m_cur_first_msg; ++m_cur_num_msg; - return **where; + result = where; } else{ //The queue can't be full so end_pos < m_cur_first_msg @@ -443,7 +496,9 @@ class mq_hdr_t const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1; const size_type second_segment_beg = 0u; const size_type second_segment_end = unique_segment ? 0u : pos_end; + #if !BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX const msg_hdr_ptr_t backup = *it_inserted_ptr_end; + #endif //First segment if(!unique_segment){ @@ -455,11 +510,14 @@ class mq_hdr_t std::copy_backward( &mp_index[0] + first_segment_beg , &mp_index[0] + first_segment_end , &mp_index[0] + first_segment_end + 1); + #if !BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX *where = backup; + #endif ++m_cur_num_msg; - return **where; + result = where; } } + return result; } #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX @@ -467,8 +525,11 @@ class mq_hdr_t typedef msg_hdr_ptr_t *iterator; //!Returns the inserted message with top priority + msg_hdr_ptr_t top_msg_ptr() + { return mp_index[m_cur_num_msg-1]; } + msg_header &top_msg() - { return *mp_index[m_cur_num_msg-1]; } + { return *top_msg_ptr(); } //!Returns the inserted message with bottom priority msg_header &bottom_msg() @@ -483,43 +544,41 @@ class mq_hdr_t iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor func) { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); } - msg_header & insert_at(iterator pos) + iterator insert_at(iterator pos) { + #if !BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX const msg_hdr_ptr_t backup = *inserted_ptr_end(); + #endif std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1); - *pos = backup; ++m_cur_num_msg; - return **pos; + #if !BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + *pos = backup; + #endif + return pos; } #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX //!Inserts the first free message in the priority queue - msg_header & queue_free_msg(unsigned int priority) + iterator queue_free_msg(msg_hdr_ptr_t hdr) { //Get priority queue's range iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end()); //Optimize for non-priority usage - if(m_cur_num_msg && priority > this->bottom_msg().priority){ + if(m_cur_num_msg && hdr->priority > this->bottom_msg().priority){ //Check for higher priority than all stored messages - if(priority > this->top_msg().priority){ + if(hdr->priority > this->top_msg().priority){ it = it_end; - } - else{ - //Since we don't now which free message we will pick - //build a dummy header for searches - msg_header dummy_hdr; - dummy_hdr.priority = priority; - - //Get free msg - msg_hdr_ptr_t dummy_ptr(&dummy_hdr); - - //Check where the free message should be placed - it = this->lower_bound(dummy_ptr, static_cast&>(*this)); + } else{ + it = this->lower_bound(hdr, static_cast&>(*this)); } } //Insert the free message in the correct position - return this->insert_at(it); + iterator pos = this->insert_at(it); + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + *pos = hdr; + #endif + return pos; } //!Returns the number of bytes needed to construct a message queue with @@ -533,8 +592,15 @@ class mq_hdr_t index_align = ::boost::container::container_detail::alignment_of::value, r_hdr_size = ipcdetail::ct_rounded_size::value, r_index_size = ipcdetail::get_rounded_size(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + r_free_stack_index_size = ipcdetail::get_rounded_size(max_num_msg * sizeof(msg_hdr_ptr_t), msg_hdr_align), + #endif r_max_msg_size = ipcdetail::get_rounded_size(max_msg_size, msg_hdr_align) + sizeof(msg_header); - return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) + + return r_hdr_size + r_index_size + + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + r_free_stack_index_size + + #endif + (max_num_msg*r_max_msg_size) + open_create_impl_t::ManagedOpenOrCreateUserOffset; } @@ -547,22 +613,41 @@ class mq_hdr_t index_align = ::boost::container::container_detail::alignment_of::value, r_hdr_size = ipcdetail::ct_rounded_size::value, r_index_size = ipcdetail::get_rounded_size(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align), + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + r_free_stack_index_size = ipcdetail::get_rounded_size(m_max_num_msg * sizeof(msg_hdr_ptr_t), msg_hdr_align), + #endif r_max_msg_size = ipcdetail::get_rounded_size(m_max_msg_size, msg_hdr_align) + sizeof(msg_header); //Pointer to the index - msg_hdr_ptr_t *index = reinterpret_cast - (reinterpret_cast(this)+r_hdr_size); + mp_index = + reinterpret_cast(reinterpret_cast(this) + + r_hdr_size + ); + + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + m_free_stack = + reinterpret_cast(reinterpret_cast(this) + + r_hdr_size + + r_index_size + ); + #endif //Pointer to the first message header - msg_header *msg_hdr = reinterpret_cast - (reinterpret_cast(this)+r_hdr_size+r_index_size); - - //Initialize the pointer to the index - mp_index = index; + msg_header *msg_hdr = + reinterpret_cast(reinterpret_cast(this) + + r_hdr_size + + r_index_size + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + + r_free_stack_index_size + #endif + ); //Initialize the index so each slot points to a preallocated message for(size_type i = 0; i < m_max_num_msg; ++i){ - index[i] = msg_hdr; + mp_index[i] = msg_hdr; + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + m_free_stack[i] = msg_hdr; + #endif msg_hdr = reinterpret_cast (reinterpret_cast(msg_hdr)+r_max_msg_size); } @@ -583,11 +668,17 @@ class mq_hdr_t interprocess_condition m_cond_recv; //Condition block senders when the queue is full interprocess_condition m_cond_send; - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) //Current start offset in the circular index size_type m_cur_first_msg; size_type m_blocked_senders; size_type m_blocked_receivers; + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX + size_type m_free_waiter_count; + interprocess_mutex m_free_mutex; + interprocess_condition m_free_condition; + msg_hdr_ptr_ptr_t m_free_stack; + size_type m_free_stack_top; + #else #endif }; @@ -610,12 +701,14 @@ class msg_queue_initialization_func_t size_type maxmsgsize = 0) : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {} - bool operator()(void *address, size_type, bool created) + bool operator()(void *address, size_type size, bool created) { char *mptr; if(created){ + mptr = reinterpret_cast(address); + //Construct the message queue header at the beginning BOOST_TRY{ new (mptr) mq_hdr_t(m_maxmsg, m_maxmsgsize); @@ -717,104 +810,117 @@ inline bool message_queue_t::timed_send } template +template +inline bool message_queue_t::do_wait(LockType &locked_lock, interprocess_condition &condition, const State &desired_state, size_type &waiter_count, block_t block, const boost::posix_time::ptime &abs_time) +{ + if (block != non_blocking) { + if (!desired_state) { + scoped_waiter_count waiter(waiter_count); + if (block == blocking) { + do { + condition.wait(locked_lock); + } + while (!desired_state); + } else { + do { + if (!condition.timed_wait(locked_lock, abs_time)) { + break; + } + } + while (!desired_state); + } + } + } + + return desired_state; +} + +template inline bool message_queue_t::do_send(block_t block, const void *buffer, size_type buffer_size, unsigned int priority, const boost::posix_time::ptime &abs_time) { + typedef ipcdetail::msg_hdr_t msg_header; + typedef typename boost::intrusive::pointer_traits::template rebind_pointer::type msg_hdr_ptr_t; + + bool result = false; + ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); + //Check if buffer is smaller than maximum allowed if (buffer_size > p_hdr->m_max_msg_size) { throw interprocess_exception(size_error); } - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - bool notify_blocked_receivers = false; - #endif - //--------------------------------------------- - scoped_lock lock(p_hdr->m_mutex); - //--------------------------------------------- + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX { - //If the queue is full execute blocking logic - if (p_hdr->is_full()) { - BOOST_TRY{ - #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX - ++p_hdr->m_blocked_senders; - #endif - switch(block){ - case non_blocking : - #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX - --p_hdr->m_blocked_senders; - #endif - return false; - break; - - case blocking : - do{ - p_hdr->m_cond_send.wait(lock); - } - while (p_hdr->is_full()); - break; - - case timed : - do{ - if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){ - if(p_hdr->is_full()){ - #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX - --p_hdr->m_blocked_senders; - #endif - return false; - } - break; - } - } - while (p_hdr->is_full()); - break; - default: - break; + //--------------------------------------------- + scoped_lock lock(p_hdr->m_free_mutex); + //--------------------------------------------- + + if (do_wait(lock, p_hdr->m_free_condition, p_hdr->m_free_stack_top, p_hdr->m_free_waiter_count, block, abs_time)) { + msg_hdr_ptr_t free_header_ptr = p_hdr->m_free_stack[--p_hdr->m_free_stack_top]; + + //--------------------------------------------- + lock.unlock(); + //--------------------------------------------- + + { + msg_header &free_msg_hdr = *free_header_ptr; + //Copy user buffer to the message + std::memcpy(free_msg_hdr.data(), buffer, buffer_size); + + free_msg_hdr.priority = priority; + free_msg_hdr.len = buffer_size; + } + + { + //--------------------------------------------- + scoped_lock lock(p_hdr->m_mutex); + //--------------------------------------------- + + //If the queue is full execute blocking logic + + //Insert the first free message in the priority queue + p_hdr->queue_free_msg(free_header_ptr); + + if (p_hdr->m_blocked_receivers) { + p_hdr->m_cond_recv.notify_one(); } - #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX - --p_hdr->m_blocked_senders; - #endif + } // Lock end + result = true; + } + } + #else + { + //--------------------------------------------- + scoped_lock lock(p_hdr->m_mutex); + //--------------------------------------------- + + typename ipcdetail::mq_hdr_t::not_full_state not_full(*p_hdr); + + if (do_wait(lock, p_hdr->m_cond_send, not_full, p_hdr->m_blocked_senders, block, abs_time)) { + + msg_header dummy; + dummy.priority = priority; + msg_header &free_msg_hdr = **p_hdr->queue_free_msg(&dummy); + + { + free_msg_hdr.priority = priority; + free_msg_hdr.len = buffer_size; + std::memcpy(free_msg_hdr.data(), buffer, buffer_size); } - BOOST_CATCH(...){ - #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX - --p_hdr->m_blocked_senders; - #endif - BOOST_RETHROW; + + if (p_hdr->m_blocked_receivers) { + p_hdr->m_cond_recv.notify_one(); } - BOOST_CATCH_END - } - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers; - #endif - //Insert the first free message in the priority queue - ipcdetail::msg_hdr_t &free_msg_hdr = p_hdr->queue_free_msg(priority); - - //Sanity check, free msgs are always cleaned when received - BOOST_ASSERT(free_msg_hdr.priority == 0); - BOOST_ASSERT(free_msg_hdr.len == 0); - - //Copy control data to the free message - free_msg_hdr.priority = priority; - free_msg_hdr.len = buffer_size; - - //Copy user buffer to the message - std::memcpy(free_msg_hdr.data(), buffer, buffer_size); - } // Lock end - - //Notify outside lock to avoid contention. This might produce some - //spurious wakeups, but it's usually far better than notifying inside. - //If this message changes the queue empty state, notify it to receivers - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - if (notify_blocked_receivers){ - p_hdr->m_cond_recv.notify_one(); + result = true; + } } - #else - p_hdr->m_cond_recv.notify_one(); #endif - return true; + return result; } template @@ -848,106 +954,88 @@ inline bool size_type &recvd_size, unsigned int &priority, const boost::posix_time::ptime &abs_time) { + typedef ipcdetail::msg_hdr_t msg_header; + typedef typename boost::intrusive::pointer_traits::template rebind_pointer::type msg_hdr_ptr_t; + + bool result = false; + ipcdetail::mq_hdr_t *p_hdr = static_cast*>(m_shmem.get_user_address()); + //Check if buffer is big enough for any message if (buffer_size < p_hdr->m_max_msg_size) { throw interprocess_exception(size_error); } - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - bool notify_blocked_senders = false; - #endif - //--------------------------------------------- - scoped_lock lock(p_hdr->m_mutex); - //--------------------------------------------- + #if BOOST_INTERPROCESS_MSG_QUEUE_FULL_DUPLEX { - //If there are no messages execute blocking logic - if (p_hdr->is_empty()) { - BOOST_TRY{ - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - ++p_hdr->m_blocked_receivers; - #endif - switch(block){ - case non_blocking : - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - --p_hdr->m_blocked_receivers; - #endif - return false; - break; - - case blocking : - do{ - p_hdr->m_cond_recv.wait(lock); - } - while (p_hdr->is_empty()); - break; - - case timed : - do{ - if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){ - if(p_hdr->is_empty()){ - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - --p_hdr->m_blocked_receivers; - #endif - return false; - } - break; - } - } - while (p_hdr->is_empty()); - break; - - //Paranoia check - default: - break; + //--------------------------------------------- + scoped_lock lock(p_hdr->m_mutex); + //--------------------------------------------- + + if (do_wait(lock, p_hdr->m_cond_recv, p_hdr->m_cur_num_msg, p_hdr->m_blocked_receivers, block, abs_time)) { + //There is at least one message ready to pick, get the top one + msg_hdr_ptr_t top_msg_ptr = p_hdr->top_msg_ptr(); + + //Free top message and put it in the free message list + p_hdr->free_top_msg(); + + // Release the main message queue mutex. + //--------------------------------------------- + lock.unlock(); + //--------------------------------------------- + + ipcdetail::msg_hdr_t &top_msg = *top_msg_ptr; + // Copy message to user buffer. + std::memcpy(buffer, top_msg.data(), top_msg.len); + + recvd_size = top_msg.len; + priority = top_msg.priority; + + { + // Acquire the free stack mutex. + //--------------------------------------------- + scoped_lock lock(p_hdr->m_free_mutex); + //--------------------------------------------- + + // Release message header to top of free stack. + p_hdr->m_free_stack[p_hdr->m_free_stack_top++] = top_msg_ptr; + + if (p_hdr->m_free_waiter_count) { + p_hdr->m_free_condition.notify_one(); } - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - --p_hdr->m_blocked_receivers; - #endif - } - BOOST_CATCH(...){ - #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX) - --p_hdr->m_blocked_receivers; - #endif - BOOST_RETHROW; } - BOOST_CATCH_END - } - #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX - notify_blocked_senders = 0 != p_hdr->m_blocked_senders; - #endif + result = true; + } + } //Lock end + #else + { + //--------------------------------------------- + scoped_lock lock(p_hdr->m_mutex); + //--------------------------------------------- - //There is at least one message ready to pick, get the top one - ipcdetail::msg_hdr_t &top_msg = p_hdr->top_msg(); + if (do_wait(lock, p_hdr->m_cond_recv, p_hdr->m_cur_num_msg, p_hdr->m_blocked_receivers, block, abs_time)) { + //There is at least one message ready to pick, get the top one + ipcdetail::msg_hdr_t &top_msg = p_hdr->top_msg(); - //Get data from the message - recvd_size = top_msg.len; - priority = top_msg.priority; + // Copy message while lock is held. + std::memcpy(buffer, top_msg.data(), top_msg.len); - //Some cleanup to ease debugging - top_msg.len = 0; - top_msg.priority = 0; + recvd_size = top_msg.len; + priority = top_msg.priority; - //Copy data to receiver's bufers - std::memcpy(buffer, top_msg.data(), recvd_size); + //Free top message and put it in the free message list + p_hdr->free_top_msg(); - //Free top message and put it in the free message list - p_hdr->free_top_msg(); + if (p_hdr->m_blocked_senders) { + p_hdr->m_cond_send.notify_one(); + } + result = true; + } } //Lock end - - //Notify outside lock to avoid contention. This might produce some - //spurious wakeups, but it's usually far better than notifying inside. - //If this reception changes the queue full state, notify senders - #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX - if (notify_blocked_senders){ - p_hdr->m_cond_send.notify_one(); - } - #else - p_hdr->m_cond_send.notify_one(); #endif - return true; + return result; } template diff --git a/test/message_queue_test.cpp b/test/message_queue_test.cpp index c4a9233e..4ad255df 100644 --- a/test/message_queue_test.cpp +++ b/test/message_queue_test.cpp @@ -229,10 +229,10 @@ bool test_serialize_db() } //] -static const int MsgSize = 10; +static const int MsgSize = 1920 * 1080 * 3; static const int NumMsg = 1000; -static char msgsend [10]; -static char msgrecv [10]; +static char msgsend [MsgSize]; +static char msgrecv [MsgSize]; static boost::interprocess::message_queue *pmessage_queue; @@ -243,7 +243,7 @@ void receiver() int nummsg = NumMsg; while(nummsg--){ - pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority); + pmessage_queue->receive(msgrecv, sizeof msgrecv, recvd_size, priority); } } @@ -253,7 +253,7 @@ bool test_buffer_overflow() { std::auto_ptr ptr(new boost::interprocess::message_queue - (create_only, test::get_process_id_name(), 10, 10)); + (create_only, test::get_process_id_name(), NumMsg, MsgSize)); pmessage_queue = ptr.get(); //Launch the receiver thread @@ -264,7 +264,7 @@ bool test_buffer_overflow() int nummsg = NumMsg; while(nummsg--){ - pmessage_queue->send(msgsend, MsgSize, 0); + pmessage_queue->send(msgsend, sizeof msgsend, 0); } boost::interprocess::ipcdetail::thread_join(thread);