2#include <boost/asio.hpp>
20namespace asio = boost::asio;
26 : _portNumber(portNumber),
27 _reuseAddress(reuseAddress),
28 _networkName(
std::move(networkName)),
29 _addressDirectory(
std::move(addressDirectory)),
38 :
SocketCommunication(0, false, utils::networking::loopbackInterfaceName(), addressDirectory)
78 acceptor.open(endpoint.protocol());
79 acceptor.set_option(tcp::acceptor::reuse_address(
_reuseAddress));
80 acceptor.bind(endpoint);
91 int requesterCommunicatorSize = -1;
96 acceptor.accept(*socket);
97 boost::asio::ip::tcp::no_delay option(
true);
98 socket->set_option(option);
103 int requesterRank = -1;
105 asio::read(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
108 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
110 _sockets[requesterRank] = std::move(socket);
114 auto adjustedRequesterRank = requesterRank + rankOffset;
115 send(acceptorRank, adjustedRequesterRank);
116 receive(requesterCommunicatorSize, adjustedRequesterRank);
119 if (peerCurrent == 0) {
120 peerCount = requesterCommunicatorSize;
124 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
126 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
127 }
while (++peerCurrent < requesterCommunicatorSize);
143 int requesterCommunicatorSize)
145 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
146 PRECICE_ASSERT(requesterCommunicatorSize >= 0,
"Requester communicator size has to be positive.");
149 if (requesterCommunicatorSize == 0) {
167 acceptor.open(endpoint.protocol());
168 acceptor.set_option(tcp::acceptor::reuse_address(
_reuseAddress));
169 acceptor.bind(endpoint);
181 for (
int connection = 0; connection < requesterCommunicatorSize; ++connection) {
183 acceptor.accept(*socket);
184 boost::asio::ip::tcp::no_delay option(
true);
185 socket->set_option(option);
190 asio::read(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
191 _sockets[requesterRank] = std::move(socket);
208 int requesterCommunicatorSize)
228 auto results = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
230 auto endpoint = results.begin()->endpoint();
231 boost::system::error_code error = asio::error::host_not_found;
232 socket->connect(endpoint, error);
239 boost::asio::deadline_timer timer(*
_ioContext, boost::posix_time::milliseconds(1));
243 boost::asio::ip::tcp::no_delay option(
true);
244 socket->set_option(option);
248 asio::write(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
250 int acceptorRank = -1;
251 asio::read(*socket, asio::buffer(&acceptorRank,
sizeof(
int)));
254 send(requesterCommunicatorSize, 0);
272 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
275 for (
auto const &acceptorRank : acceptorRanks) {
289 PRECICE_DEBUG(
"Requesting connection to {}, port {}", ipAddress, portNumber);
293 auto endpoints = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
295 boost::system::error_code error = asio::error::host_not_found;
296 boost::asio::connect(*socket, endpoints, error);
303 boost::asio::deadline_timer timer(*
_ioContext, boost::posix_time::milliseconds(1));
307 boost::asio::ip::tcp::no_delay option(
true);
308 socket->set_option(option);
311 _sockets[acceptorRank] = std::move(socket);
312 send(requesterRank, acceptorRank);
340 socket.second->shutdown(Socket::shutdown_send);
341 socket.second->close();
359 size_t size = itemToSend.
size() + 1;
361 asio::write(*
_sockets[rankReceiver], asio::buffer(&size,
sizeof(
size_t)));
362 asio::write(*
_sockets[rankReceiver], asio::buffer(itemToSend.
c_str(), size));
364 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
378 asio::write(*
_sockets[rankReceiver], asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
int)));
380 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
391 create_directories(dir);
393 PRECICE_WARN(
"Creating directory for connection info failed with filesystem error: {}", e.
what());
406 PRECICE_WARN(
"Cleaning up connection info failed with filesystem error {}", e.
what());
422 asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
int)),
424 std::static_pointer_cast<SocketRequest>(request)->complete();
439 asio::write(*
_sockets[rankReceiver], asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
double)));
441 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
457 asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
double)),
459 std::static_pointer_cast<SocketRequest>(request)->complete();
474 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
double)));
476 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
482 return aSend(precice::refToSpan<const double>(itemToSend), rankReceiver);
495 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
int)));
497 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
503 return aSend(precice::refToSpan<const int>(itemToSend), rankReceiver);
516 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
bool)));
518 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
534 asio::buffer(&itemToSend,
sizeof(
bool)),
553 asio::read(*
_sockets[rankSender], asio::buffer(&size,
sizeof(
size_t)));
555 asio::read(*
_sockets[rankSender], asio::buffer(msg.
data(), size));
556 itemToReceive = msg.
data();
558 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
572 asio::read(*
_sockets[rankSender], asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
int)));
574 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
588 asio::read(*
_sockets[rankSender], asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
double)));
590 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
607 asio::async_read(*
_sockets[rankSender],
608 asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
double)),
609 [request](boost::system::error_code
const &,
std::size_t) {
610 std::static_pointer_cast<SocketRequest>(request)->complete();
613 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
629 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
double)));
631 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
637 return aReceive(precice::refToSpan<double>(itemToReceive), rankSender);
650 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
int)));
652 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
669 asio::async_read(*
_sockets[rankSender],
670 asio::buffer(&itemToReceive,
sizeof(
int)),
671 [request](boost::system::error_code
const &,
std::size_t) {
675 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
691 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
bool)));
693 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
709 asio::async_read(*
_sockets[rankSender],
710 asio::buffer(&itemToReceive,
sizeof(
bool)),
711 [request](boost::system::error_code
const &,
std::size_t) {
715 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
734 struct if_nameindex *nameInterface = if_nameindex();
735 for (
struct if_nameindex *itNameInterface = nameInterface; itNameInterface->if_index != 0; ++itNameInterface) {
737 interface.index = itNameInterface->if_index;
738 interface.name = itNameInterface->if_name;
741 if_freenameindex(nameInterface);
744 for (
auto &interface : interfaces) {
745 struct ifreq request;
747 interface.name.c_str(),
750 auto socketfd = socket(AF_INET, SOCK_STREAM, 0);
751 if (socketfd == -1) {
754 auto err = ioctl(socketfd, SIOCGIFADDR, &request);
760 const char *addr = inet_ntoa((
reinterpret_cast<struct sockaddr_in *
>(&request.ifr_addr))->sin_addr);
764 interface.address = addr;
782 auto interfaces = detectInterfaces();
785 [&](Interface
const &interface) { return interface.name == _networkName; });
786 if (pos == interfaces.
end()) {
789 err <<
"Cannot find network interface \"" <<
_networkName <<
"\". Available interfaces are: ";
790 for (
const auto &interface : interfaces) {
791 err << interface.name <<
' ';
793 err <<
" Please check \"network\" attributes in your configuration file.";
798 PRECICE_CHECK(not pos->address.empty(),
"The interface \"{}\" does not have an IP address. Please select another interface.",
_networkName);
#define PRECICE_ERROR(...)
#define PRECICE_WARN(...)
#define PRECICE_DEBUG(...)
#define PRECICE_TRACE(...)
#define PRECICE_CHECK(check,...)
#define PRECICE_ASSERT(...)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
virtual int adjustRank(Rank rank) const
Adjusts the given rank bases on the _rankOffset.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
Implements Communication by using sockets.
std::string getIpAddress()
std::unique_ptr< WorkGuard > _workGuard
PtrRequest aReceive(precice::span< double > itemsToReceive, int rankSender) override
Asynchronously receives an array of double values.
void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
void send(std::string const &itemToSend, Rank rankReceiver) override
Sends a std::string to process with given rank.
std::string _addressDirectory
Directory where IP address is exchanged by file.
void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
std::shared_ptr< IOContext > _ioContext
void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
std::string _networkName
Name of network to communicate over.
size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
std::map< int, std::shared_ptr< Socket > > _sockets
Remote rank -> socket map.
boost::asio::io_context IOContext
void receive(std::string &itemToReceive, Rank rankSender) override
Receives a std::string from process with given rank.
void closeConnection() override
Disconnects from communication space, i.e. participant.
void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
~SocketCommunication() override
unsigned short _portNumber
Port used for socket connection.
SocketCommunication(unsigned short portNumber=0, bool reuseAddress=false, std::string networkName=utils::networking::loopbackInterfaceName(), std::string addressDirectory=".")
PtrRequest aSend(precice::span< const int > itemsToSend, Rank rankReceiver) override
Asynchronously sends an array of integer values.
void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
void dispatch(std::shared_ptr< Socket > sock, boost::asio::const_buffer data, std::function< void()> callback)
Put data in the queue, start processing the queue.
A C++ 11 implementation of the non-owning C++20 std::span type.
constexpr pointer data() const noexcept
constexpr size_type size() const noexcept
T emplace_back(T... args)
T generic_string(T... args)
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.