Block.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. ///
  2. /// \file Framework/Block.hpp
  3. ///
  4. /// This file contains the interface for creating custom Blocks.
  5. ///
  6. /// \copyright
  7. /// Copyright (c) 2014-2019 Josh Blum
  8. /// SPDX-License-Identifier: BSL-1.0
  9. ///
  10. #pragma once
  11. #include <Pothos/Config.hpp>
  12. #include <Pothos/Framework/Connectable.hpp>
  13. #include <Pothos/Framework/WorkInfo.hpp>
  14. #include <Pothos/Framework/InputPort.hpp>
  15. #include <Pothos/Framework/OutputPort.hpp>
  16. #include <Pothos/Framework/ThreadPool.hpp>
  17. #include <memory>
  18. #include <string>
  19. #include <vector>
  20. #include <map>
  21. namespace Pothos {
  22. class BufferManager;
  23. /*!
  24. * Block is an interface for creating custom computational processing.
  25. * Users should subclass Block, setup the input and output ports,
  26. * and overload the work() method for a custom computational task.
  27. *
  28. * The outputs of a Block can be connected to the inputs of another.
  29. * Any resources produced at the Block's output ports will be
  30. * make available to the other Block's connected input ports.
  31. */
  32. class POTHOS_API Block : public Connectable
  33. {
  34. public:
  35. //! Default constructor
  36. Block(void);
  37. //! Virtual destructor
  38. virtual ~Block(void);
  39. //! Set the thread pool used by this block.
  40. void setThreadPool(const ThreadPool &threadPool);
  41. //! Get the thread pool used by this block
  42. const ThreadPool &getThreadPool(void) const;
  43. protected:
  44. /*!
  45. * The prepare() method allows the block to say whether it will be able to
  46. * do any work, regardless of whether input or output is available. For
  47. * example, resources unknown to the framework may be unavailable. This call
  48. * happens before the framework determines whether it thinks there is work
  49. * for the block.
  50. *
  51. * The block should not access framework-managed (e.g., ports, messages,
  52. * signals/slots) in this call, though it is free to do other things.
  53. *
  54. * Returns true if the framework should proceed normally and determine
  55. * whether work is available, false if the block would be unable to run and
  56. * should be passed over.
  57. */
  58. virtual bool prepare(void);
  59. /*!
  60. * The work() method, called when resources are available.
  61. * Subclasses must override this call when creating a worker.
  62. *
  63. * When work() is invoked, the user's code can access the port,
  64. * consume input resources, and produce output resources.
  65. *
  66. * Only the work() thread is allowed to call this method,
  67. * therefore users should never directly invoke this method.
  68. */
  69. virtual void work(void);
  70. /*!
  71. * The activate() method, called when the topology execution begins.
  72. * Override this call to implement a custom topology activation hook.
  73. */
  74. virtual void activate(void);
  75. /*!
  76. * The deactivate() method, called when the topology execution ends.
  77. * Override this call to implement a custom topology deactivation hook.
  78. */
  79. virtual void deactivate(void);
  80. /*!
  81. * The work() thread calls the propagateLabels() method after work()
  82. * when labels are available to propagate to downstream consumers.
  83. *
  84. * Default behavior: All labels with an index less than the number
  85. * of consumed elements will be propagated to all output ports.
  86. * Subclasses may override this call to customize its behavior.
  87. *
  88. * Only the work() thread is allowed to call this method,
  89. * therefore users should never directly invoke this method.
  90. *
  91. * Access the labels iterator with the call to input->labels().
  92. * This iterator will contain only labels from the consumed elements.
  93. * Forward labels to the output ports using postLabel() on an output port object.
  94. *
  95. * \param input a pointer to the input port with labels
  96. */
  97. virtual void propagateLabels(const InputPort *input);
  98. /*!
  99. * The opaque call handler handles dispatching calls to registered methods.
  100. * The user may overload this call to install their own custom handler.
  101. * \throws BlockCallNotFound when no call registered for the provided name
  102. * \throws Exception when the registered call itself throws an exception
  103. * \param name the name of a call registered to this Block with registerCall()
  104. * \param inputArgs an array of input arguments wrapped in type Object
  105. * \param numArgs the number of arguments in the array inputArgs
  106. * \return the result of making the registered call, wrapped in type Object
  107. */
  108. virtual Object opaqueCallHandler(const std::string &name, const Object *inputArgs, const size_t numArgs);
  109. /*!
  110. * Get a buffer manager for this input port.
  111. * The user may overload this call to install a custom buffer manager.
  112. *
  113. * The domain parameter describes the memory used by the upstream blocks.
  114. * Knowing the domain allows the implementer of getInputBufferManager to
  115. * - abdicate to the upstream's buffer managers (null return)
  116. * - provide a replacement upstream buffer manager (return manager)
  117. * - protest the ability to interact with the domain (throw exception)
  118. *
  119. * \throws PortDomainError when the domain is incompatible
  120. * \param name the name of an input port on this block
  121. * \param domain the domain of the upstream blocks
  122. * \return a new buffer manager for this port or null sptr
  123. */
  124. virtual std::shared_ptr<BufferManager> getInputBufferManager(const std::string &name, const std::string &domain);
  125. /*!
  126. * Get a buffer manager for this output port.
  127. * The user may overload this call to install a custom buffer manager.
  128. *
  129. * The domain parameter describes the memory used by the downstream blocks.
  130. * Knowing the domain allows the implementer of getOutputBufferManager to
  131. * - abdicate to the downstream's buffer managers (null return)
  132. * - provide a replacement downstream buffer manager (return manager)
  133. * - protest the ability to interact with the domain (throw exception)
  134. *
  135. * \throws PortDomainError when the domain is incompatible
  136. * \param name the name of an output port on this block
  137. * \param domain the domain of the downstream blocks
  138. * \return a new buffer manager for this port or null sptr
  139. */
  140. virtual std::shared_ptr<BufferManager> getOutputBufferManager(const std::string &name, const std::string &domain);
  141. public:
  142. /*!
  143. * Set the displayable alias for the specified input port.
  144. */
  145. void setInputAlias(const std::string &portName, const std::string &alias);
  146. /*!
  147. * Set the displayable alias for the specified output port.
  148. */
  149. void setOutputAlias(const std::string &portName, const std::string &alias);
  150. /*!
  151. * Get a vector of info about all of the input ports available.
  152. */
  153. std::vector<PortInfo> inputPortInfo(void);
  154. /*!
  155. * Get a vector of info about all of the output ports available.
  156. */
  157. std::vector<PortInfo> outputPortInfo(void);
  158. /*!
  159. * Get the input port at the specified port name.
  160. */
  161. InputPort *input(const std::string &name) const;
  162. /*!
  163. * Get the input port at the specified port index.
  164. */
  165. InputPort *input(const size_t index) const;
  166. /*!
  167. * Get the output port at the specified port name.
  168. */
  169. OutputPort *output(const std::string &name) const;
  170. /*!
  171. * Get the output port at the specified port index.
  172. */
  173. OutputPort *output(const size_t index) const;
  174. /*!
  175. * Get the indexable input ports.
  176. * These ports have a port name which is an integer.
  177. * Indexable ports can be accessed with O(1) access time.
  178. * \return a vector of pointers to input port objects
  179. */
  180. const std::vector<InputPort*> &inputs(void) const;
  181. /*!
  182. * Get the indexable output ports.
  183. * These ports have a port name which is an integer.
  184. * Indexable ports can be accessed with O(1) access time.
  185. * \return a vector of pointers to output port objects
  186. */
  187. const std::vector<OutputPort*> &outputs(void) const;
  188. /*!
  189. * Get all input ports.
  190. * These ports can be accessed with the port name string.
  191. * \return a map of pointers to input port objects
  192. */
  193. const std::map<std::string, InputPort*> &allInputs(void) const;
  194. /*!
  195. * Get all output ports.
  196. * These ports can be accessed with the port name string.
  197. * \return a map of pointers to output port objects
  198. */
  199. const std::map<std::string, OutputPort*> &allOutputs(void) const;
  200. /*!
  201. * Get information about a work session that is not port-specific.
  202. * The info is valid during calls to work() and propagateLabels().
  203. */
  204. const WorkInfo &workInfo(void) const;
  205. /*!
  206. * Is the block in an active state?
  207. * This is a thread-safe way for a block's methods
  208. * to determine if the processing is currently active.
  209. * \return true when the topology is executing
  210. */
  211. bool isActive(void) const;
  212. /*!
  213. * Configure an input port with the given data type.
  214. * The data type parameter specifies the size in bytes per input element.
  215. * The data type is only relevant when the port is used for streaming data.
  216. * The domain parameter is used to specify the type of memory consumed.
  217. * The domain will be passed into another block's getOutputBufferManager() call.
  218. * \param name the name of this input port
  219. * \param dtype the data type for elements
  220. * \param domain the expected memory domain
  221. * \return a pointer to the new input port
  222. */
  223. InputPort *setupInput(const std::string &name, const DType &dtype = "", const std::string &domain = "");
  224. /*!
  225. * Configure an input port with the given data type.
  226. * This call is equivalent to setupInput(std::to_string(index), ...);
  227. * \param index the index number of this input port
  228. * \param dtype the data type for elements
  229. * \param domain the expected memory domain
  230. * \return a pointer to the new input port
  231. */
  232. InputPort *setupInput(const size_t index, const DType &dtype = "", const std::string &domain = "");
  233. /*!
  234. * Configure an output port with the given data type.
  235. * The data type parameter specifies the size in bytes per output element.
  236. * The data type is only relevant when the port is used for streaming data.
  237. * The domain parameter is used to specify the type of memory produced.
  238. * The domain will be passed into another block's getInputBufferManager() call.
  239. * \param name the name of this output port
  240. * \param dtype the data type for elements
  241. * \param domain the expected memory domain
  242. * \return a pointer to the new output port
  243. */
  244. OutputPort *setupOutput(const std::string &name, const DType &dtype = "", const std::string &domain = "");
  245. /*!
  246. * Configure an output port with the given data type.
  247. * This call is equivalent to setupOutput(std::to_string(index), ...);
  248. * \param index the index number of this output port
  249. * \param dtype the data type for elements
  250. * \param domain the expected memory domain
  251. * \return a pointer to the new output port
  252. */
  253. OutputPort *setupOutput(const size_t index, const DType &dtype = "", const std::string &domain = "");
  254. /*!
  255. * Export a function call on this block to set/get parameters.
  256. * This call will automatically register a slot of the same name
  257. * when the call has a void return type and the name does not
  258. * start with an underscore in which case its considered private.
  259. * \param name the name of the callable
  260. * \param call the bound callable method
  261. */
  262. void registerCallable(const std::string &name, const Callable &call);
  263. /*!
  264. * Register that this block has a signal of the given name.
  265. * A signal is capable of emitting messages to a slot.
  266. * The name should not overlap with the name of an output port.
  267. * \param name the name of the signal
  268. * \return a pointer to the new output port
  269. */
  270. void registerSignal(const std::string &name);
  271. /*!
  272. * Register that this block has a slot of the given name.
  273. * A slot is capable of accepting messages from a signal.
  274. * The name should not overlap with the name of an input port.
  275. * Note: do not call the registerSlot function in C++,
  276. * as registerCallable() automatically registers a slot.
  277. * \param name the name of the slot
  278. */
  279. void registerSlot(const std::string &name);
  280. /*!
  281. * Register a probe given the name of a registered call.
  282. * A probe creates a special slot that will probe the registered call,
  283. * and creates a triggered signal that will emit the return value of that call.
  284. *
  285. * - Arguments passed into the probe slot will be forwarded into the registered call.
  286. * - The return value of the registered call will be passed into the first argument of the triggered signal.
  287. * - If the return value of the registered call is void, then the triggered signal will have no arguments.
  288. * - When not specified, the slot's name will be probe[Name], and the signal's name will be [name]Triggered.
  289. *
  290. * \param name the name of a registered call
  291. * \param signalName the name of the triggered signal or empty for automatic
  292. * \param slotName the name of the probe slot or empty for automatic
  293. */
  294. void registerProbe(
  295. const std::string &name,
  296. const std::string &signalName="",
  297. const std::string &slotName="");
  298. /*!
  299. * Notify the scheduler that the work() method will yield the thread context.
  300. * Call this method when the work() function will not produce or consume,
  301. * so that the scheduler will call work() again without an external stimulus.
  302. * Only call this method from within a call to the work() function.
  303. * A typical use case for calling yield are blocks that must wait on a resource.
  304. * Such blocks cannot hold the thread context for more than the allowed time,
  305. * and must therefore return from the work() call without producing output.
  306. */
  307. void yield(void);
  308. /*!
  309. * Emit a signal to all subscribed slots.
  310. * \param name the name of a registered signal
  311. * \param args a variable number of arguments
  312. */
  313. template <typename... ArgsType>
  314. void emitSignal(const std::string &name, ArgsType&&... args);
  315. /*!
  316. * Call a method on a derived instance with opaque input and return types.
  317. * \param name the name of the method as a string
  318. * \param inputArgs an array of input arguments
  319. * \param numArgs the size of the input array
  320. * \return the return value as type Object
  321. */
  322. Object opaqueCallMethod(const std::string &name, const Object *inputArgs, const size_t numArgs) const;
  323. private:
  324. WorkInfo _workInfo;
  325. std::vector<std::string> _inputPortNames;
  326. std::vector<std::string> _outputPortNames;
  327. std::vector<InputPort*> _indexedInputs;
  328. std::vector<OutputPort*> _indexedOutputs;
  329. std::map<std::string, InputPort*> _namedInputs;
  330. std::map<std::string, OutputPort*> _namedOutputs;
  331. std::multimap<std::string, Callable> _calls;
  332. std::map<std::string, std::pair<std::string, std::string>> _probes;
  333. ThreadPool _threadPool;
  334. Block(const Block &) = delete; // non construction-copyable
  335. Block &operator=(const Block &) = delete; // non copyable
  336. public:
  337. std::shared_ptr<WorkerActor> _actor;
  338. friend class WorkerActor;
  339. };
  340. } //namespace Pothos