PothosUtilProxyServer.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. // Copyright (c) 2013-2017 Josh Blum
  2. // SPDX-License-Identifier: BSL-1.0
  3. #include "PothosUtil.hpp"
  4. #include <Pothos/Init.hpp>
  5. #include <Pothos/Remote.hpp>
  6. #include <Pothos/Util/Network.hpp>
  7. #include <Poco/Net/ServerSocket.h>
  8. #include <Poco/Net/SocketStream.h>
  9. #include <Poco/Net/TCPServer.h>
  10. #include <Poco/Process.h>
  11. #include <Poco/URI.h>
  12. #include <mutex>
  13. #include <cassert>
  14. #include <iostream>
  15. /***********************************************************************
  16. * TCP connection factory
  17. * - create connection handler threads
  18. * - monitor connection start and stop
  19. * - kill process in require active mode
  20. **********************************************************************/
  21. class MyTCPServerConnectionFactory : public Poco::Net::TCPServerConnectionFactory
  22. {
  23. public:
  24. MyTCPServerConnectionFactory(const bool requireActive):
  25. _numConnections(0),
  26. _requireActive(requireActive)
  27. {
  28. return;
  29. }
  30. Poco::Net::TCPServerConnection *createConnection(const Poco::Net::StreamSocket &socket);
  31. void connectionStart(void)
  32. {
  33. std::unique_lock<std::mutex> lock(_mutex);
  34. _numConnections++;
  35. }
  36. void connectionStop(void)
  37. {
  38. std::unique_lock<std::mutex> lock(_mutex);
  39. assert(_numConnections != 0);
  40. _numConnections--;
  41. if (_numConnections == 0 and _requireActive)
  42. {
  43. std::cerr << "Proxy server: No active connections - terminating" << std::endl;
  44. Poco::Process::requestTermination(Poco::Process::id());
  45. }
  46. }
  47. private:
  48. std::mutex _mutex;
  49. size_t _numConnections;
  50. const bool _requireActive;
  51. };
  52. /***********************************************************************
  53. * TCP connection thread
  54. * - create the remote handler for the connection
  55. * - service the remote handler until disconnect
  56. **********************************************************************/
  57. class MyTCPServerConnection : public Poco::Net::TCPServerConnection
  58. {
  59. public:
  60. MyTCPServerConnection(MyTCPServerConnectionFactory &factory, const Poco::Net::StreamSocket &socket):
  61. Poco::Net::TCPServerConnection(socket),
  62. _factory(factory),
  63. _handler(Pothos::RemoteHandler(socket.peerAddress().host().toString()))
  64. {
  65. _factory.connectionStart();
  66. }
  67. ~MyTCPServerConnection(void)
  68. {
  69. _factory.connectionStop();
  70. }
  71. void run(void)
  72. {
  73. this->socket().setNoDelay(true);
  74. Poco::Net::SocketStream socketStream(this->socket());
  75. _handler.runHandler(socketStream, socketStream);
  76. }
  77. private:
  78. MyTCPServerConnectionFactory &_factory;
  79. Pothos::RemoteHandler _handler;
  80. };
  81. Poco::Net::TCPServerConnection *MyTCPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket &socket)
  82. {
  83. return new MyTCPServerConnection(*this, socket);
  84. }
  85. /***********************************************************************
  86. * Spawn TCP proxy server given server URI
  87. **********************************************************************/
  88. void PothosUtilBase::proxyServer(const std::string &, const std::string &uriStr)
  89. {
  90. //remove automatic flushing from iostreams
  91. //only flushes on newlines and intentional flushes
  92. std::cout << std::nounitbuf;
  93. std::cerr << std::nounitbuf;
  94. std::clog << std::nounitbuf;
  95. //fully buffered IO backs up and is not acceptable for logging
  96. //set stdio to be line buffered which is useful for logging
  97. //on windows, line buffering is not supported, use unbuffered
  98. #ifdef _MSC_VER
  99. setvbuf(stdout, nullptr, _IONBF, 0);
  100. setvbuf(stderr, nullptr, _IONBF, 0);
  101. #else
  102. setvbuf(stdout, nullptr, _IOLBF, 0);
  103. setvbuf(stderr, nullptr, _IOLBF, 0);
  104. #endif
  105. Pothos::ScopedInit init;
  106. //parse the URI
  107. const std::string defaultUri = "tcp://"+Pothos::Util::getWildcardAddr(Pothos::RemoteServer::getLocatorPort());
  108. Poco::URI uri(uriStr.empty()?defaultUri:uriStr);
  109. const std::string &host = uri.getHost();
  110. const std::string &port = std::to_string(uri.getPort());
  111. if (uri.getScheme() != "tcp")
  112. {
  113. throw Pothos::Exception("PothosUtil::proxyServer("+uriStr+")", "unsupported URI scheme");
  114. }
  115. //create server socket
  116. Poco::Net::SocketAddress sa(host, port);
  117. Poco::Net::ServerSocket serverSocket(sa);
  118. Poco::Net::TCPServerConnectionFactory::Ptr factory;
  119. factory = new MyTCPServerConnectionFactory(this->config().hasOption("requireActive"));
  120. Poco::Net::TCPServer tcpServer(factory, serverSocket);
  121. //start the server
  122. tcpServer.start();
  123. std::cout << "Host: " << serverSocket.address().host().toString() << std::endl;
  124. std::cout << "Port: " << serverSocket.address().port() << std::endl;
  125. //wait here until the term signal is received
  126. this->waitForTerminationRequest();
  127. }