InputPort.hpp 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. ///
  2. /// \file Framework/InputPort.hpp
  3. ///
  4. /// This file provides an interface for a worker's input port.
  5. ///
  6. /// \copyright
  7. /// Copyright (c) 2014-2017 Josh Blum
  8. /// SPDX-License-Identifier: BSL-1.0
  9. ///
  10. #pragma once
  11. #include <Pothos/Config.hpp>
  12. #include <Pothos/Object/Object.hpp>
  13. #include <Pothos/Framework/DType.hpp>
  14. #include <Pothos/Framework/Label.hpp>
  15. #include <Pothos/Framework/BufferChunk.hpp>
  16. #include <Pothos/Framework/BufferAccumulator.hpp>
  17. #include <Pothos/Util/RingDeque.hpp>
  18. #include <Pothos/Util/SpinLock.hpp>
  19. #include <string>
  20. namespace Pothos {
  21. class WorkerActor;
  22. class OutputPort;
  23. /*!
  24. * InputPort provides methods to interact with a worker's input ports.
  25. */
  26. class POTHOS_API InputPort
  27. {
  28. public:
  29. //! Destructor
  30. ~InputPort(void);
  31. /*!
  32. * Get the index number of this port.
  33. * An index of -1 means the port cannot be represented by an integer.
  34. * \return the index or -1
  35. */
  36. int index(void) const;
  37. //! Get the string name identifier for this port.
  38. const std::string &name(void) const;
  39. //! Get a displayable name for this port.
  40. const std::string &alias(void) const;
  41. //! Set the displayable alias for this port.
  42. void setAlias(const std::string &alias);
  43. //! Get the data type information for this port.
  44. const DType &dtype(void) const;
  45. //! Get the domain information for this port
  46. const std::string &domain(void) const;
  47. /*!
  48. * Get access to the stream buffer.
  49. * For non-stream ports, this returns an empty buffer chunk.
  50. */
  51. const BufferChunk &buffer(void) const;
  52. /*!
  53. * Get the number of elements available in the stream buffer.
  54. * The number of elements is the available bytes/dtype size.
  55. */
  56. size_t elements(void) const;
  57. /*!
  58. * Get the total number of elements consumed on this port.
  59. * The value returned by this method will not change
  60. * until after execution of work() and propagateLabels().
  61. */
  62. unsigned long long totalElements(void) const;
  63. /*!
  64. * Get the total number of buffers posted to this port.
  65. * Note that this call tracks incoming buffer count,
  66. * and not total buffer consumption (which is harder).
  67. */
  68. unsigned long long totalBuffers(void) const;
  69. /*!
  70. * Get the total number of labels consumed from this port.
  71. * This count updates immediately upon calling removeLabel(),
  72. * and after after execution of work() and propagateLabels().
  73. */
  74. unsigned long long totalLabels(void) const;
  75. /*!
  76. * Get the total number of messages popped from this port.
  77. * The value returned by this method will be incremented
  78. * immediately upon calling popMessage().
  79. */
  80. unsigned long long totalMessages(void) const;
  81. //! Does the specified input port have an asynchronous message available?
  82. bool hasMessage(void);
  83. /*!
  84. * Get an iterator to all input labels for the specified port.
  85. * Labels are sorted in order of oldest to newest by label index.
  86. * \return an iterable object with sorted labels
  87. */
  88. const LabelIteratorRange &labels(void) const;
  89. /*!
  90. * Remove a label from the internal storage structure.
  91. * This invalidates the iterator retrieved from labels().
  92. * Since labels are automatically removed by consume(),
  93. * this call allows a users to simplify state tracking
  94. * in their block implementations by removing labels,
  95. * that might be iterated through on subsequent runs.
  96. */
  97. void removeLabel(const Label &label);
  98. /*!
  99. * Consume elements on this port.
  100. * The number of elements specified must be less than
  101. * or equal to the number of elements available.
  102. * \param numElements the number of elements to consume
  103. */
  104. void consume(const size_t numElements);
  105. /*!
  106. * Take buffer transfers ownership of the buffer to the caller.
  107. * Use takeBuffer() to support perfect buffer forwarding
  108. * with postBuffer() and postMessage() on an output port.
  109. * \code
  110. * auto buff = inPort->takeBuffer();
  111. * outPort->postBuffer(std::move(buff));
  112. * \endcode
  113. * \note Note that takeBuffer() does not consume. The caller must also call
  114. * consume() with the number of elements actually read from the buffer.
  115. * \post buffer() has undefined behavior after this call.
  116. * \return the buffer from this input port
  117. */
  118. BufferChunk takeBuffer(void);
  119. /*!
  120. * Remove and return an asynchronous message from the port.
  121. * If there is no message available, a null Object() is returned.
  122. * \return an asynchronous message object
  123. */
  124. Object popMessage(void);
  125. /*!
  126. * Return an asynchronous message from the port without removing it.
  127. * If there is no message available, a null Object() is returned.
  128. * \return an asynchronous message object
  129. */
  130. Object peekMessage(void);
  131. /*!
  132. * Set a reserve requirement on this input port.
  133. * The reserve size ensures that when sufficient resources are available,
  134. * the buffer will contain at least the specified number of elements.
  135. * By default, each input port has a reserve of zero elements,
  136. * which means that the input port's buffer may be any size,
  137. * including empty, depending upon the available resources.
  138. * Note that work() may still be called when the reserve is not met,
  139. * because the scheduler will only prevent work() from being called
  140. * when all ports fail to meet their respective reserve requirements.
  141. * \param numElements the number of elements to require
  142. */
  143. void setReserve(const size_t numElements);
  144. /*!
  145. * Is this port used for signal handling in a signals + slots paradigm?
  146. */
  147. bool isSlot(void) const;
  148. /*!
  149. * Push a buffer into the buffer queue of this input port.
  150. * This is a thread-safe call, it can be made from any context.
  151. * Use pushBuffer to preload an input port with elements, example:
  152. * a window-sized history of elements for a filter block,
  153. * or a preloaded number of elements for a feedback loop.
  154. */
  155. void pushBuffer(const BufferChunk &buffer);
  156. /*!
  157. * Push a label into the label storage of this input port.
  158. * This is a thread-safe call, it can be made from any context.
  159. * When using this call, first push the buffer with the
  160. * corresponding label index *before* pushing the label.
  161. */
  162. void pushLabel(const Label &label);
  163. /*!
  164. * Push a message into the message queue of this input port.
  165. * This is a thread-safe call, it can be made from any context.
  166. */
  167. void pushMessage(const Object &message);
  168. /*!
  169. * Clear all memory on this input port.
  170. * Clear buffers, labels, and messages.
  171. */
  172. void clear(void);
  173. private:
  174. WorkerActor *_actor;
  175. //port configuration
  176. bool _isSlot;
  177. int _index;
  178. std::string _name;
  179. std::string _alias;
  180. DType _dtype;
  181. std::string _domain;
  182. //state set in pre-work
  183. BufferChunk _buffer;
  184. size_t _elements;
  185. LabelIteratorRange _labelIter;
  186. //port stats
  187. unsigned long long _totalElements;
  188. unsigned long long _totalBuffers;
  189. unsigned long long _totalLabels;
  190. unsigned long long _totalMessages;
  191. //state changes from work
  192. size_t _pendingElements;
  193. size_t _reserveElements;
  194. //counts work actions which we will use to establish activity
  195. size_t _workEvents;
  196. Util::SpinLock _asyncMessagesLock;
  197. Util::RingDeque<std::pair<Object, BufferChunk>> _asyncMessages;
  198. Util::SpinLock _slotCallsLock;
  199. Util::RingDeque<std::pair<Object, BufferChunk>> _slotCalls;
  200. std::vector<Label> _inlineMessages; //user api structure
  201. Util::RingDeque<Label> _inputInlineMessages; //shared structure
  202. Util::SpinLock _bufferAccumulatorLock;
  203. BufferAccumulator _bufferAccumulator;
  204. std::vector<OutputPort *> _subscribers;
  205. /////// async message interface /////////
  206. void asyncMessagesPush(const Object &message, const BufferChunk &token = BufferChunk::null());
  207. bool asyncMessagesEmpty(void);
  208. Object asyncMessagesPop(void);
  209. Object asyncMessagesPeek(void);
  210. void asyncMessagesClear(void);
  211. /////// slot call interface /////////
  212. void slotCallsPush(const Object &args, const BufferChunk &token);
  213. bool slotCallsEmpty(void);
  214. Object slotCallsPop(void);
  215. void slotCallsClear(void);
  216. /////// inline message interface /////////
  217. void inlineMessagesPush(const Label &label);
  218. void inlineMessagesClear(void);
  219. /////// input buffer interface /////////
  220. void bufferAccumulatorFront(BufferChunk &);
  221. void bufferAccumulatorPush(const BufferChunk &buffer);
  222. void bufferAccumulatorPushNoLock(BufferChunk &&buffer);
  223. void bufferAccumulatorPop(const size_t numBytes);
  224. void bufferAccumulatorRequire(const size_t numBytes);
  225. void bufferAccumulatorClear(void);
  226. /////// combined label association push /////////
  227. void bufferLabelPush(
  228. const bool enableMove,
  229. std::vector<Label> &postedLabels,
  230. Util::RingDeque<BufferChunk> &postedBuffers);
  231. InputPort(void);
  232. InputPort(const InputPort &) = delete; // non construction-copyable
  233. InputPort &operator=(const InputPort &) = delete; // non copyable
  234. friend class WorkerActor;
  235. friend class OutputPort;
  236. };
  237. } //namespace Pothos