2#include <boost/asio.hpp>
20namespace asio = boost::asio;
26 : _portNumber(portNumber),
27 _reuseAddress(reuseAddress),
28 _networkName(
std::move(networkName)),
29 _addressDirectory(
std::move(addressDirectory)),
38 :
SocketCommunication(0, false, utils::networking::loopbackInterfaceName(), addressDirectory)
78 acceptor.open(endpoint.protocol());
79 acceptor.set_option(tcp::acceptor::reuse_address(
_reuseAddress));
80 acceptor.bind(endpoint);
91 int requesterCommunicatorSize = -1;
94 auto socket = std::make_shared<Socket>(*
_ioService);
96 acceptor.accept(*socket);
100 int requesterRank = -1;
102 asio::read(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
105 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
107 _sockets[requesterRank] = std::move(socket);
111 auto adjustedRequesterRank = requesterRank + rankOffset;
112 send(acceptorRank, adjustedRequesterRank);
113 receive(requesterCommunicatorSize, adjustedRequesterRank);
116 if (peerCurrent == 0) {
117 peerCount = requesterCommunicatorSize;
121 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
123 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
124 }
while (++peerCurrent < requesterCommunicatorSize);
141 int requesterCommunicatorSize)
143 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
144 PRECICE_ASSERT(requesterCommunicatorSize >= 0,
"Requester communicator size has to be positive.");
147 if (requesterCommunicatorSize == 0) {
165 acceptor.open(endpoint.protocol());
166 acceptor.set_option(tcp::acceptor::reuse_address(
_reuseAddress));
167 acceptor.bind(endpoint);
179 for (
int connection = 0; connection < requesterCommunicatorSize; ++connection) {
180 auto socket = std::make_shared<Socket>(*
_ioService);
181 acceptor.accept(*socket);
186 asio::read(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
187 _sockets[requesterRank] = std::move(socket);
204 int requesterCommunicatorSize)
218 auto socket = std::make_shared<Socket>(*
_ioService);
222 tcp::resolver::query query(tcp::v4(), ipAddress, portNumber, tcp::resolver::query::numeric_host);
226 tcp::resolver::endpoint_type endpoint = *(resolver.resolve(query));
227 boost::system::error_code error = asio::error::host_not_found;
228 socket->connect(endpoint, error);
235 boost::asio::deadline_timer timer(*
_ioService, boost::posix_time::milliseconds(1));
242 asio::write(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
244 int acceptorRank = -1;
245 asio::read(*socket, asio::buffer(&acceptorRank,
sizeof(
int)));
248 send(requesterCommunicatorSize, 0);
266 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
269 for (
auto const &acceptorRank : acceptorRanks) {
279 auto socket = std::make_shared<Socket>(*
_ioService);
283 PRECICE_DEBUG(
"Requesting connection to {}, port {}", ipAddress, portNumber);
285 tcp::resolver::query query(tcp::v4(), ipAddress, portNumber, tcp::resolver::query::numeric_host);
289 tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
290 boost::system::error_code error = asio::error::host_not_found;
291 boost::asio::connect(*socket, std::move(endpoint_iterator), error);
298 boost::asio::deadline_timer timer(*
_ioService, boost::posix_time::milliseconds(1));
304 _sockets[acceptorRank] = std::move(socket);
305 send(requesterRank, acceptorRank);
333 socket.second->shutdown(Socket::shutdown_send);
334 socket.second->close();
352 size_t size = itemToSend.
size() + 1;
354 asio::write(*
_sockets[rankReceiver], asio::buffer(&size,
sizeof(
size_t)));
355 asio::write(*
_sockets[rankReceiver], asio::buffer(itemToSend.
c_str(), size));
357 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
371 asio::write(*
_sockets[rankReceiver], asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
int)));
373 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
384 create_directories(dir);
386 PRECICE_WARN(
"Creating directory for connection info failed with filesystem error: {}", e.
what());
399 PRECICE_WARN(
"Cleaning up connection info failed with filesystem error {}", e.
what());
415 asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
int)),
417 std::static_pointer_cast<SocketRequest>(request)->complete();
432 asio::write(*
_sockets[rankReceiver], asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
double)));
434 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
450 asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
double)),
452 std::static_pointer_cast<SocketRequest>(request)->complete();
467 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
double)));
469 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
475 return aSend(precice::refToSpan<const double>(itemToSend), rankReceiver);
488 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
int)));
490 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
496 return aSend(precice::refToSpan<const int>(itemToSend), rankReceiver);
509 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
bool)));
511 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
527 asio::buffer(&itemToSend,
sizeof(
bool)),
529 std::static_pointer_cast<SocketRequest>(request)->complete();
546 asio::read(*
_sockets[rankSender], asio::buffer(&size,
sizeof(
size_t)));
548 asio::read(*
_sockets[rankSender], asio::buffer(msg.
data(), size));
549 itemToReceive = msg.
data();
551 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
565 asio::read(*
_sockets[rankSender], asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
int)));
567 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
581 asio::read(*
_sockets[rankSender], asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
double)));
583 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
600 asio::async_read(*
_sockets[rankSender],
601 asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
double)),
602 [request](boost::system::error_code
const &,
std::size_t) {
603 std::static_pointer_cast<SocketRequest>(request)->complete();
606 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
622 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
double)));
624 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
630 return aReceive(precice::refToSpan<double>(itemToReceive), rankSender);
643 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
int)));
645 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
662 asio::async_read(*
_sockets[rankSender],
663 asio::buffer(&itemToReceive,
sizeof(
int)),
664 [request](boost::system::error_code
const &,
std::size_t) {
665 std::static_pointer_cast<SocketRequest>(request)->complete();
668 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
684 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
bool)));
686 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
702 asio::async_read(*
_sockets[rankSender],
703 asio::buffer(&itemToReceive,
sizeof(
bool)),
704 [request](boost::system::error_code
const &,
std::size_t) {
705 std::static_pointer_cast<SocketRequest>(request)->complete();
708 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.
what());
727 struct if_nameindex *nameInterface = if_nameindex();
728 for (
struct if_nameindex *itNameInterface = nameInterface; itNameInterface->if_index != 0; ++itNameInterface) {
730 interface.index = itNameInterface->if_index;
731 interface.name = itNameInterface->if_name;
734 if_freenameindex(nameInterface);
737 for (
auto &interface : interfaces) {
738 struct ifreq request;
740 interface.name.c_str(),
743 auto socketfd = socket(AF_INET, SOCK_STREAM, 0);
744 if (socketfd == -1) {
747 auto err = ioctl(socketfd, SIOCGIFADDR, &request);
753 const char *addr = inet_ntoa((
reinterpret_cast<struct sockaddr_in *
>(&request.ifr_addr))->sin_addr);
757 interface.address = addr;
775 auto interfaces = detectInterfaces();
778 [&](Interface
const &interface) { return interface.name == _networkName; });
779 if (pos == interfaces.
end()) {
782 err <<
"Cannot find network interface \"" <<
_networkName <<
"\". Available interfaces are: ";
783 for (
const auto &interface : interfaces) {
784 err << interface.name <<
' ';
786 err <<
" Please check \"network\" attributes in your configuration file.";
791 PRECICE_CHECK(not pos->address.empty(),
"The interface \"{}\" does not have an IP address. Please select another interface.",
_networkName);
#define PRECICE_ERROR(...)
#define PRECICE_WARN(...)
#define PRECICE_DEBUG(...)
#define PRECICE_TRACE(...)
#define PRECICE_CHECK(check,...)
#define PRECICE_ASSERT(...)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
virtual int adjustRank(Rank rank) const
Adjusts the given rank bases on the _rankOffset.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
Implements Communication by using sockets.
std::string getIpAddress()
virtual PtrRequest aReceive(precice::span< double > itemsToReceive, int rankSender) override
Asynchronously receives an array of double values.
virtual void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
virtual void send(std::string const &itemToSend, Rank rankReceiver) override
Sends a std::string to process with given rank.
std::string _addressDirectory
Directory where IP address is exchanged by file.
virtual void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
virtual ~SocketCommunication()
virtual void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
std::string _networkName
Name of network to communicate over.
virtual size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
std::map< int, std::shared_ptr< Socket > > _sockets
Remote rank -> socket map.
virtual void receive(std::string &itemToReceive, Rank rankSender) override
Receives a std::string from process with given rank.
std::shared_ptr< Work > _work
boost::asio::io_service IOService
virtual void closeConnection() override
Disconnects from communication space, i.e. participant.
virtual void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
unsigned short _portNumber
Port used for socket connection.
SocketCommunication(unsigned short portNumber=0, bool reuseAddress=false, std::string networkName=utils::networking::loopbackInterfaceName(), std::string addressDirectory=".")
virtual PtrRequest aSend(precice::span< const int > itemsToSend, Rank rankReceiver) override
Asynchronously sends an array of integer values.
virtual void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
virtual void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
std::shared_ptr< IOService > _ioService
void dispatch(std::shared_ptr< Socket > sock, boost::asio::const_buffers_1 data, std::function< void()> callback)
Put data in the queue, start processing the queue.
A C++ 11 implementation of the non-owning C++20 std::span type.
constexpr pointer data() const noexcept
constexpr size_type size() const noexcept
T emplace_back(T... args)
T generic_string(T... args)
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.