diff --git a/os/kernel/include/chqueues.h b/os/kernel/include/chqueues.h index 4b55e9c46..d1bad3149 100644 --- a/os/kernel/include/chqueues.h +++ b/os/kernel/include/chqueues.h @@ -67,12 +67,13 @@ typedef void (*qnotify_t)(GenericQueue *qp); * @ref system_states) and is non-blocking. */ struct GenericQueue { + ThreadsQueue q_waiting; /**< @brief Queue of waiting threads. */ + size_t q_counter; /**< @brief Resources counter. */ uint8_t *q_buffer; /**< @brief Pointer to the queue buffer.*/ uint8_t *q_top; /**< @brief Pointer to the first location after the buffer. */ uint8_t *q_wrptr; /**< @brief Write pointer. */ uint8_t *q_rdptr; /**< @brief Read pointer. */ - Semaphore q_sem; /**< @brief Counter @p Semaphore. */ qnotify_t q_notify; /**< @brief Data notification callback. */ }; @@ -84,21 +85,19 @@ struct GenericQueue { * * @iclass */ -#define chQSizeI(qp) ((qp)->q_top - (qp)->q_buffer) +#define chQSizeI(qp) ((size_t)((qp)->q_top - (qp)->q_buffer)) /** * @brief Queue space. * @details Returns the used space if used on an input queue or the empty * space if used on an output queue. - * @note The returned value can be less than zero when there are waiting - * threads on the internal semaphore. * * @param[in] qp pointer to a @p GenericQueue structure. * @return The buffer space. * * @iclass */ -#define chQSpaceI(qp) chSemGetCounterI(&(qp)->q_sem) +#define chQSpaceI(qp) ((size_t)((qp)->q_counter)) /** * @extends GenericQueue @@ -113,6 +112,28 @@ struct GenericQueue { */ typedef GenericQueue InputQueue; +/** + * @brief Returns the filled space into an input queue. + * + * @param[in] iqp pointer to an @p InputQueue structure + * @return The number of full bytes in the queue. + * @retval 0 if the queue is empty. + * + * @iclass + */ +#define chIQGetFullI(iqp) chQSpaceI(iqp) + +/** + * @brief Returns the empty space into an input queue. + * + * @param[in] iqp pointer to an @p InputQueue structure + * @return The number of empty bytes in the queue. + * @retval 0 if the queue is full. + * + * @iclass + */ +#define chIQGetEmptyI(iqp) (chQSizeI(iqp) - chQSpaceI(iqp)) + /** * @brief Evaluates to @p TRUE if the specified input queue is empty. * @@ -135,8 +156,7 @@ typedef GenericQueue InputQueue; * * @iclass */ -#define chIQIsFullI(iqp) ((bool_t)(((iqp)->q_wrptr == (iqp)->q_rdptr) && \ - !chIQIsEmptyI(iqp))) +#define chIQIsFullI(iqp) ((bool_t)(chQSpaceI(iqp) >= chQSizeI(iqp))) /** * @brief Input queue read. @@ -162,13 +182,14 @@ typedef GenericQueue InputQueue; * @param[in] size size of the queue buffer area * @param[in] inotify input notification callback pointer */ -#define _INPUTQUEUE_DATA(name, buffer, size, inotify) { \ - (uint8_t *)(buffer), \ - (uint8_t *)(buffer) + size, \ - (uint8_t *)(buffer), \ - (uint8_t *)(buffer), \ - _SEMAPHORE_DATA(name.q_sem, 0), \ - inotify \ +#define _INPUTQUEUE_DATA(name, buffer, size, inotify) { \ + _THREADSQUEUE_DATA(name), \ + 0, \ + (uint8_t *)(buffer), \ + (uint8_t *)(buffer) + (size), \ + (uint8_t *)(buffer), \ + (uint8_t *)(buffer), \ + inotify \ } /** @@ -197,6 +218,28 @@ typedef GenericQueue InputQueue; */ typedef GenericQueue OutputQueue; + /** + * @brief Returns the filled space into an output queue. + * + * @param[in] oqp pointer to an @p OutputQueue structure + * @return The number of full bytes in the queue. + * @retval 0 if the queue is empty. + * + * @iclass + */ +#define chOQGetFullI(oqp) (chQSizeI(oqp) - chQSpaceI(oqp)) + +/** + * @brief Returns the empty space into an output queue. + * + * @param[in] iqp pointer to an @p OutputQueue structure + * @return The number of empty bytes in the queue. + * @retval 0 if the queue is full. + * + * @iclass + */ +#define chOQGetEmptyI(iqp) chQSpaceI(oqp) + /** * @brief Evaluates to @p TRUE if the specified output queue is empty. * @@ -207,8 +250,7 @@ typedef GenericQueue OutputQueue; * * @iclass */ -#define chOQIsEmptyI(oqp) ((bool_t)(((oqp)->q_wrptr == (oqp)->q_rdptr) && \ - !chOQIsFullI(oqp))) +#define chOQIsEmptyI(oqp) ((bool_t)(chQSpaceI(oqp) >= chQSizeI(oqp))) /** * @brief Evaluates to @p TRUE if the specified output queue is full. @@ -248,13 +290,14 @@ typedef GenericQueue OutputQueue; * @param[in] size size of the queue buffer area * @param[in] onotify output notification callback pointer */ -#define _OUTPUTQUEUE_DATA(name, buffer, size, onotify) { \ - (uint8_t *)(buffer), \ - (uint8_t *)(buffer) + size, \ - (uint8_t *)(buffer), \ - (uint8_t *)(buffer), \ - _SEMAPHORE_DATA(name.q_sem, size), \ - onotify \ +#define _OUTPUTQUEUE_DATA(name, buffer, size, onotify) { \ + _THREADSQUEUE_DATA(name), \ + (size), \ + (uint8_t *)(buffer), \ + (uint8_t *)(buffer) + (size), \ + (uint8_t *)(buffer), \ + (uint8_t *)(buffer), \ + onotify \ } /** @@ -274,7 +317,6 @@ typedef GenericQueue OutputQueue; extern "C" { #endif void chIQInit(InputQueue *iqp, uint8_t *bp, size_t size, qnotify_t infy); - size_t chIQGetFullI(InputQueue *iqp); void chIQResetI(InputQueue *iqp); msg_t chIQPutI(InputQueue *iqp, uint8_t b); msg_t chIQGetTimeout(InputQueue *iqp, systime_t time); @@ -282,7 +324,6 @@ extern "C" { size_t n, systime_t time); void chOQInit(OutputQueue *oqp, uint8_t *bp, size_t size, qnotify_t onfy); - size_t chOQGetFullI(OutputQueue *oqp); void chOQResetI(OutputQueue *oqp); msg_t chOQPutTimeout(OutputQueue *oqp, uint8_t b, systime_t time); msg_t chOQGetI(OutputQueue *oqp); diff --git a/os/kernel/include/chthreads.h b/os/kernel/include/chthreads.h index df6fc329f..215bb9920 100644 --- a/os/kernel/include/chthreads.h +++ b/os/kernel/include/chthreads.h @@ -183,8 +183,10 @@ struct Thread { #define THD_STATE_SNDMSG 11 /** @brief Thread state: Waiting in @p chMsgWait().*/ #define THD_STATE_WTMSG 12 +/** @brief Thread state: Waiting on an I/O queue.*/ +#define THD_STATE_WTQUEUE 13 /** @brief Thread state: After termination.*/ -#define THD_STATE_FINAL 13 +#define THD_STATE_FINAL 14 /* * Various flags into the thread p_flags field. diff --git a/os/kernel/src/chmtx.c b/os/kernel/src/chmtx.c index af2b7f347..df71d1cc6 100644 --- a/os/kernel/src/chmtx.c +++ b/os/kernel/src/chmtx.c @@ -134,14 +134,16 @@ void chMtxLockS(Mutex *mp) { prio_insert(dequeue(tp), (ThreadsQueue *)tp->p_u.wtobjp); tp = ((Mutex *)tp->p_u.wtobjp)->m_owner; continue; -#if CH_USE_CONDVARS | CH_USE_SEMAPHORES_PRIORITY | CH_USE_MESSAGES_PRIORITY +#if CH_USE_CONDVARS | \ + (CH_USE_SEMAPHORES && CH_USE_SEMAPHORES_PRIORITY) | \ + (CH_USE_MESSAGES && CH_USE_MESSAGES_PRIORITY) #if CH_USE_CONDVARS case THD_STATE_WTCOND: #endif -#if CH_USE_SEMAPHORES_PRIORITY +#if CH_USE_SEMAPHORES && CH_USE_SEMAPHORES_PRIORITY case THD_STATE_WTSEM: #endif -#if CH_USE_MESSAGES_PRIORITY +#if CH_USE_MESSAGES && CH_USE_MESSAGES_PRIORITY case THD_STATE_SNDMSGQ: #endif /* Re-enqueues tp with its new priority on the queue.*/ diff --git a/os/kernel/src/chqueues.c b/os/kernel/src/chqueues.c index 05fbddfb9..f6ae10257 100644 --- a/os/kernel/src/chqueues.c +++ b/os/kernel/src/chqueues.c @@ -47,6 +47,30 @@ #if CH_USE_QUEUES || defined(__DOXYGEN__) +/** + * @brief Puts the invoking thread into the queue's threads queue. + * + * @param[out] qp pointer to an @p GenericQueue structure + * @param[in] time the number of ticks before the operation timeouts, + * the following special values are allowed: + * - @a TIME_IMMEDIATE immediate timeout. + * - @a TIME_INFINITE no timeout. + * . + * @return A message specifying how the invoking thread has been + * released from threads queue. + * @retval RDY_OK is the normal exit, thread signaled. + * @retval RDY_RESET if the queue has been reset. + * @retval RDY_TIMEOUT if the queue operation timed out. + */ +static msg_t qwait(GenericQueue *qp, systime_t time) { + + if (TIME_IMMEDIATE == time) + return RDY_TIMEOUT; + currp->p_u.wtobjp = qp; + queue_insert(currp, &qp->q_waiting); + return chSchGoSleepTimeoutS(THD_STATE_WTQUEUE, time); +} + /** * @brief Initializes an input queue. * @details A Semaphore is internally initialized and works as a counter of @@ -64,28 +88,11 @@ */ void chIQInit(InputQueue *iqp, uint8_t *bp, size_t size, qnotify_t infy) { + queue_init(&iqp->q_waiting); + iqp->q_counter = 0; iqp->q_buffer = iqp->q_rdptr = iqp->q_wrptr = bp; iqp->q_top = bp + size; iqp->q_notify = infy; - chSemInit(&iqp->q_sem, 0); -} - -/** - * @brief Returns the filled space into an input queue. - * - * @param[in] iqp pointer to an @p InputQueue structure - * @return The number of bytes in the queue. - * @retval 0 if the queue is empty. - * - * @iclass - */ -size_t chIQGetFullI(InputQueue *iqp) { - cnt_t cnt; - - cnt = chQSpaceI(iqp); - if (cnt < 0) - return 0; - return (size_t)cnt; } /** @@ -102,7 +109,9 @@ size_t chIQGetFullI(InputQueue *iqp) { void chIQResetI(InputQueue *iqp) { iqp->q_rdptr = iqp->q_wrptr = iqp->q_buffer; - chSemResetI(&iqp->q_sem, 0); + iqp->q_counter = 0; + while (notempty(&iqp->q_waiting)) + chSchReadyI(fifo_remove(&iqp->q_waiting))->p_u.rdymsg = RDY_RESET; } /** @@ -123,10 +132,12 @@ msg_t chIQPutI(InputQueue *iqp, uint8_t b) { if (chIQIsFullI(iqp)) return Q_FULL; + iqp->q_counter++; *iqp->q_wrptr++ = b; if (iqp->q_wrptr >= iqp->q_top) iqp->q_wrptr = iqp->q_buffer; - chSemSignalI(&iqp->q_sem); + if (notempty(&iqp->q_waiting)) + chSchReadyI(fifo_remove(&iqp->q_waiting))->p_u.rdymsg = RDY_OK; return Q_OK; } @@ -150,17 +161,21 @@ msg_t chIQPutI(InputQueue *iqp, uint8_t b) { */ msg_t chIQGetTimeout(InputQueue *iqp, systime_t time) { uint8_t b; - msg_t msg; chSysLock(); - if (iqp->q_notify) iqp->q_notify(iqp); - if ((msg = chSemWaitTimeoutS(&iqp->q_sem, time)) < RDY_OK) { - chSysUnlock(); - return msg; + while (chIQIsEmptyI(iqp)) { + msg_t msg; + + if ((msg = qwait((GenericQueue *)iqp, time)) < RDY_OK) { + chSysUnlock(); + return msg; + } } + + iqp->q_counter--; b = *iqp->q_rdptr++; if (iqp->q_rdptr >= iqp->q_top) iqp->q_rdptr = iqp->q_buffer; @@ -202,16 +217,16 @@ size_t chIQReadTimeout(InputQueue *iqp, uint8_t *bp, chSysLock(); while (TRUE) { - if (chIQIsEmptyI(iqp)) { + while (chIQIsEmptyI(iqp)) { if (nfy) nfy(iqp); - if ((chSemWaitTimeoutS(&iqp->q_sem, time) != RDY_OK)) { + if (qwait((GenericQueue *)iqp, time) != RDY_OK) { chSysUnlock(); return r; } } - else - chSemFastWaitI(&iqp->q_sem); + + iqp->q_counter--; *bp++ = *iqp->q_rdptr++; if (iqp->q_rdptr >= iqp->q_top) iqp->q_rdptr = iqp->q_buffer; @@ -245,28 +260,11 @@ size_t chIQReadTimeout(InputQueue *iqp, uint8_t *bp, */ void chOQInit(OutputQueue *oqp, uint8_t *bp, size_t size, qnotify_t onfy) { + queue_init(&oqp->q_waiting); + oqp->q_counter = size; oqp->q_buffer = oqp->q_rdptr = oqp->q_wrptr = bp; oqp->q_top = bp + size; oqp->q_notify = onfy; - chSemInit(&oqp->q_sem, (cnt_t)size); -} - -/** - * @brief Returns the filled space into an output queue. - * - * @param[in] oqp pointer to an @p OutputQueue structure - * @return The number of bytes in the queue. - * @retval 0 if the queue is empty. - * - * @iclass - */ -size_t chOQGetFullI(OutputQueue *oqp) { - cnt_t cnt; - - cnt = chQSpaceI(oqp); - if (cnt < 0) - return chQSizeI(oqp); - return chQSizeI(oqp) - (size_t)cnt; } /** @@ -283,7 +281,9 @@ size_t chOQGetFullI(OutputQueue *oqp) { void chOQResetI(OutputQueue *oqp) { oqp->q_rdptr = oqp->q_wrptr = oqp->q_buffer; - chSemResetI(&oqp->q_sem, (cnt_t)(oqp->q_top - oqp->q_buffer)); + oqp->q_counter = chQSizeI(oqp); + while (notempty(&oqp->q_waiting)) + chSchReadyI(fifo_remove(&oqp->q_waiting))->p_u.rdymsg = RDY_RESET; } /** @@ -307,13 +307,18 @@ void chOQResetI(OutputQueue *oqp) { * @api */ msg_t chOQPutTimeout(OutputQueue *oqp, uint8_t b, systime_t time) { - msg_t msg; chSysLock(); - if ((msg = chSemWaitTimeoutS(&oqp->q_sem, time)) < RDY_OK) { - chSysUnlock(); - return msg; + while (chOQIsFullI(oqp)) { + msg_t msg; + + if ((msg = qwait((GenericQueue *)oqp, time)) < RDY_OK) { + chSysUnlock(); + return msg; + } } + + oqp->q_counter--; *oqp->q_wrptr++ = b; if (oqp->q_wrptr >= oqp->q_top) oqp->q_wrptr = oqp->q_buffer; @@ -341,10 +346,12 @@ msg_t chOQGetI(OutputQueue *oqp) { if (chOQIsEmptyI(oqp)) return Q_EMPTY; + oqp->q_counter++; b = *oqp->q_rdptr++; if (oqp->q_rdptr >= oqp->q_top) oqp->q_rdptr = oqp->q_buffer; - chSemSignalI(&oqp->q_sem); + if (notempty(&oqp->q_waiting)) + chSchReadyI(fifo_remove(&oqp->q_waiting))->p_u.rdymsg = RDY_OK; return b; } @@ -381,16 +388,15 @@ size_t chOQWriteTimeout(OutputQueue *oqp, const uint8_t *bp, chSysLock(); while (TRUE) { - if (chOQIsFullI(oqp)) { + while (chOQIsFullI(oqp)) { if (nfy) nfy(oqp); - if ((chSemWaitTimeoutS(&oqp->q_sem, time) != RDY_OK)) { + if (qwait((GenericQueue *)oqp, time) != RDY_OK) { chSysUnlock(); return w; } } - else - chSemFastWaitI(&oqp->q_sem); + oqp->q_counter--; *oqp->q_wrptr++ = *bp++; if (oqp->q_wrptr >= oqp->q_top) oqp->q_wrptr = oqp->q_buffer; diff --git a/os/kernel/src/chschd.c b/os/kernel/src/chschd.c index 54e7918b1..d41649b4c 100644 --- a/os/kernel/src/chschd.c +++ b/os/kernel/src/chschd.c @@ -130,12 +130,16 @@ static void wakeup(void *p) { /* Handling the special case where the thread has been made ready by another thread with higher priority.*/ return; -#if CH_USE_SEMAPHORES || (CH_USE_CONDVARS && CH_USE_CONDVARS_TIMEOUT) +#if CH_USE_SEMAPHORES || CH_USE_QUEUES || \ + (CH_USE_CONDVARS && CH_USE_CONDVARS_TIMEOUT) #if CH_USE_SEMAPHORES case THD_STATE_WTSEM: chSemFastSignalI((Semaphore *)tp->p_u.wtobjp); /* Falls into, intentional. */ #endif +#if CH_USE_QUEUES + case THD_STATE_WTQUEUE: +#endif #if CH_USE_CONDVARS && CH_USE_CONDVARS_TIMEOUT case THD_STATE_WTCOND: #endif diff --git a/readme.txt b/readme.txt index 7b4729e8b..484a57dd8 100644 --- a/readme.txt +++ b/readme.txt @@ -71,6 +71,7 @@ ***************************************************************************** *** 2.3.3 *** +- FIX: Race condition in output queues (bug 3303908)(backported to 2.2.4). - FIX: Fixed timeout problem in the lwIP interface layer (bug 3302420) (backported to 2.2.4). - NEW: Reorganization of the Cortex-Mx ports in order to reduced code and @@ -89,6 +90,8 @@ - CHANGE: Removed the CH_CURRP_REGISTER_CACHE option, it is GCC-specific so it does not belong to the kernel options. The feature will be eventually reimplemented as a port-specific option. +- CHANGE: chiQGetFullI() and chOQGetFullI() become macros. The queues + subsystem has been optimized and is no more dependent on semaphores. *** 2.3.2 *** - FIX: Fixed invalid BRR() macro in AVR serial driver (bug 3299306)(backported