InputPortImpl.hpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. ///
  2. /// \file Framework/InputPortImpl.hpp
  3. ///
  4. /// Inline member implementation for InputPort class.
  5. ///
  6. /// \copyright
  7. /// Copyright (c) 2014-2017 Josh Blum
  8. /// SPDX-License-Identifier: BSL-1.0
  9. ///
  10. #pragma once
  11. #include <Pothos/Framework/InputPort.hpp>
  12. #include <mutex> //lock_guard
  13. inline int Pothos::InputPort::index(void) const
  14. {
  15. return _index;
  16. }
  17. inline const std::string &Pothos::InputPort::name(void) const
  18. {
  19. return _name;
  20. }
  21. inline const Pothos::DType &Pothos::InputPort::dtype(void) const
  22. {
  23. return _dtype;
  24. }
  25. inline const std::string &Pothos::InputPort::domain(void) const
  26. {
  27. return _domain;
  28. }
  29. inline const Pothos::BufferChunk &Pothos::InputPort::buffer(void) const
  30. {
  31. return _buffer;
  32. }
  33. inline size_t Pothos::InputPort::elements(void) const
  34. {
  35. return _elements;
  36. }
  37. inline unsigned long long Pothos::InputPort::totalElements(void) const
  38. {
  39. return _totalElements;
  40. }
  41. inline unsigned long long Pothos::InputPort::totalBuffers(void) const
  42. {
  43. return _totalBuffers;
  44. }
  45. inline unsigned long long Pothos::InputPort::totalLabels(void) const
  46. {
  47. return _totalLabels;
  48. }
  49. inline unsigned long long Pothos::InputPort::totalMessages(void) const
  50. {
  51. return _totalMessages;
  52. }
  53. inline const Pothos::LabelIteratorRange &Pothos::InputPort::labels(void) const
  54. {
  55. return _labelIter;
  56. }
  57. inline void Pothos::InputPort::consume(const size_t numElements)
  58. {
  59. _pendingElements += numElements;
  60. }
  61. inline Pothos::BufferChunk Pothos::InputPort::takeBuffer(void)
  62. {
  63. return std::move(_buffer);
  64. }
  65. inline bool Pothos::InputPort::hasMessage(void)
  66. {
  67. return not this->asyncMessagesEmpty();
  68. }
  69. inline bool Pothos::InputPort::isSlot(void) const
  70. {
  71. return _isSlot;
  72. }
  73. inline Pothos::Object Pothos::InputPort::popMessage(void)
  74. {
  75. auto msg = this->asyncMessagesPop();
  76. _totalMessages++;
  77. _workEvents++;
  78. return msg;
  79. }
  80. inline Pothos::Object Pothos::InputPort::peekMessage(void)
  81. {
  82. return this->asyncMessagesPeek();
  83. }
  84. inline void Pothos::InputPort::removeLabel(const Label &label)
  85. {
  86. for (auto it = _inlineMessages.begin(); it != _inlineMessages.end(); it++)
  87. {
  88. if (*it == label)
  89. {
  90. _inlineMessages.erase(it);
  91. _labelIter = _inlineMessages;
  92. _totalLabels++;
  93. _workEvents++;
  94. return;
  95. }
  96. }
  97. }
  98. inline void Pothos::InputPort::setReserve(const size_t numElements)
  99. {
  100. //only mark this change when setting a larger reserve
  101. if (numElements > _reserveElements) _workEvents++;
  102. _reserveElements = numElements;
  103. }
  104. inline bool Pothos::InputPort::asyncMessagesEmpty(void)
  105. {
  106. std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
  107. return _asyncMessages.empty();
  108. }
  109. inline Pothos::Object Pothos::InputPort::asyncMessagesPop(void)
  110. {
  111. std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
  112. if (_asyncMessages.empty()) return Pothos::Object();
  113. auto msg = std::move(_asyncMessages.front().first);
  114. _asyncMessages.pop_front();
  115. return msg;
  116. }
  117. inline Pothos::Object Pothos::InputPort::asyncMessagesPeek(void)
  118. {
  119. std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
  120. if (_asyncMessages.empty()) return Pothos::Object();
  121. return _asyncMessages.front().first;
  122. }
  123. inline void Pothos::InputPort::inlineMessagesPush(const Pothos::Label &label)
  124. {
  125. std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
  126. if (_inputInlineMessages.full()) _inputInlineMessages.set_capacity(_inputInlineMessages.capacity()*2);
  127. _inputInlineMessages.push_back(label);
  128. }
  129. inline void Pothos::InputPort::inlineMessagesClear(void)
  130. {
  131. std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
  132. _inputInlineMessages.clear();
  133. _inlineMessages.clear();
  134. }
  135. inline void Pothos::InputPort::bufferAccumulatorFront(Pothos::BufferChunk &buff)
  136. {
  137. std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
  138. while (not _inputInlineMessages.empty())
  139. {
  140. _inlineMessages.push_back(std::move(_inputInlineMessages.front()));
  141. _inlineMessages.back().adjust(1, this->dtype().size());
  142. _inputInlineMessages.pop_front();
  143. }
  144. buff = _bufferAccumulator.front();
  145. }
  146. inline void Pothos::InputPort::bufferAccumulatorPush(const BufferChunk &buffer)
  147. {
  148. std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
  149. this->bufferAccumulatorPushNoLock(BufferChunk(buffer));
  150. }
  151. inline void Pothos::InputPort::bufferAccumulatorRequire(const size_t numBytes)
  152. {
  153. std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
  154. _bufferAccumulator.require(numBytes);
  155. }
  156. inline void Pothos::InputPort::bufferAccumulatorClear(void)
  157. {
  158. std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
  159. _bufferAccumulator = BufferAccumulator();
  160. }