Skip to content

Commit

Permalink
Merge branch 'hotfix/1.24.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Aug 3, 2023
2 parents 0c2e958 + f93c597 commit f0a2844
Show file tree
Hide file tree
Showing 30 changed files with 2,644 additions and 11 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.24.3
1.24.4
3 changes: 3 additions & 0 deletions src/eckit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,8 @@ serialisation/Streamable.cc
serialisation/Streamable.h
)



list( APPEND eckit_persist_srcs
persist/Bless.h
persist/DumpLoad.cc
Expand Down Expand Up @@ -965,6 +967,7 @@ if( HAVE_ECKIT_SQL )
add_subdirectory( sql )
endif()

add_subdirectory( distributed )
add_subdirectory( geometry )
add_subdirectory( linalg )
add_subdirectory( maths )
Expand Down
1 change: 1 addition & 0 deletions src/eckit/container/BTree.cc
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ size_t BTree<K, V, S, L>::count(unsigned long page) const {
return c;
}


template <class K, class V, int S, class L>
void BTree<K, V, S, L>::lockShared() {
L::lockRange(file_.fileno(), 0, 0, F_SETLKW, F_RDLCK);
Expand Down
88 changes: 88 additions & 0 deletions src/eckit/distributed/Actor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/


#include "eckit/distributed/Actor.h"
#include "eckit/distributed/Transport.h"

namespace eckit::distributed {

//----------------------------------------------------------------------------------------------------------------------

Actor::~Actor() {
}

Actor::Actor(Transport &transport):
transport_(transport),
title_(transport.title()) {
}

void Actor::sendMessageToNextWorker(const Message &msg) const {
transport_.sendMessageToNextWorker(msg);
}

void Actor::sendStatisticsToProducer(const Message &msg) const {
transport_.sendStatisticsToProducer(msg);
}

void Actor::getNextWorkMessage(Message &message) const {
transport_.getNextWorkMessage(message);
}

void Actor::getNextWriteMessage(Message &message) const {
transport_.getNextWriteMessage(message);
}

void Actor::sendToWriter(int writer, const Message &message) const {
transport_.sendToWriter(writer, message);
}

void Actor::sendShutDownMessage() const {
transport_.sendShutDownMessage(*this);
}

void Actor::messageFromWorker(Message&, int) const {
}

void Actor::messageFromWriter(Message&, int) const {
}

const char* Actor::tagName(int tag) {
switch (tag) {

case READY:
return "READY";

case WORK:
return "WORK";

case SHUTDOWN:
return "SHUTDOWN";

case OPEN:
return "OPEN";

case WRITE:
return "WRITE";

case CLOSE:
return "CLOSE";

case STATISTICS:
return "STATISTICS";

default:
return "UNKNOWN";
}
}
//----------------------------------------------------------------------------------------------------------------------

} // namespace eckit

75 changes: 75 additions & 0 deletions src/eckit/distributed/Actor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

/// @file Actor.h
/// @author Baudouin Raoult
/// @author Tiago Quintino
/// @date May 2016

#pragma once

#include <string>

#include "eckit/memory/NonCopyable.h"


namespace eckit::distributed {

class Transport;
class Message;

//----------------------------------------------------------------------------------------------------------------------

class Actor : private eckit::NonCopyable {
public:

enum MessageTags {
READY,
WORK,
SHUTDOWN,
OPEN,
WRITE,
CLOSE,
STATISTICS,
BYE
};

public: // methods

Actor(Transport &transport);
virtual ~Actor();

virtual void run() = 0;
virtual void finalise() = 0;

virtual void messageFromWorker(Message &message, int worker) const;
virtual void messageFromWriter(Message &message, int worker) const;
virtual void sendStatisticsToProducer(const Message &message) const;

virtual void sendMessageToNextWorker(const Message &message) const;
virtual void getNextWorkMessage(Message &message) const;
virtual void getNextWriteMessage(Message &message) const;
virtual void sendToWriter(int writer, const Message &message) const;

virtual void sendShutDownMessage() const;

static const char* tagName(int);

protected: // members

Transport& transport_;
std::string title_;

};


//----------------------------------------------------------------------------------------------------------------------

} // namespace eckit
41 changes: 41 additions & 0 deletions src/eckit/distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
list( APPEND eckit_distributed_srcs
Actor.cc
Actor.h
Consumer.cc
Consumer.h
Message.cc
Message.h
NoTransport.cc
NoTransport.h
Producer.cc
Producer.h
Transport.cc
Transport.h
TransportHandle.cc
TransportHandle.h
TransportStatistics.cc
TransportStatistics.h
tcp/TCPTransport.cc
tcp/TCPTransport.h
)

if( HAVE_MPI )

list( APPEND eckit_distributed_srcs
mpi/MPITransport.cc
mpi/MPITransport.h
)
endif()

ecbuild_add_library( TARGET eckit_distributed TYPE SHARED
INSTALL_HEADERS ALL
HEADER_DESTINATION
${INSTALL_INCLUDE_DIR}/eckit/distributed
SOURCES
${eckit_distributed_srcs}
PUBLIC_LIBS
eckit )

if ( HAVE_MPI )
target_link_libraries( eckit_distributed PUBLIC eckit_mpi )
endif()
83 changes: 83 additions & 0 deletions src/eckit/distributed/Consumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

#include <unistd.h>

#include "eckit/log/Log.h"
#include "eckit/log/TimeStamp.h"
#include "eckit/log/ResourceUsage.h"

#include "eckit/distributed/Consumer.h"
#include "eckit/distributed/Message.h"
#include "eckit/distributed/Transport.h"

using eckit::Log;

namespace eckit::distributed {

//----------------------------------------------------------------------------------------------------------------------

Consumer::Consumer(Transport &transport):
Actor(transport) {
}

void Consumer::run() {

// eckit::TraceResourceUsage<LibPGen> usage("Message::write()");
// eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " starting " << std::endl;

Message msg;

while (true) {

msg.rewind();
// eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " begin getNextMessage" << std::endl;

getNextMessage(msg);
// eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " end getNextMessage" << std::endl;

if (msg.shutdownRequested()) {
msg.rewind();
shutdown(msg);
sendStatisticsToProducer(msg);
break;
}

try {
// eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " begin consume" << std::endl;

consume(msg);
// eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " end consume" << std::endl;

} catch (eckit::Exception &e) {
eckit::Log::info() << "Failure: " << e.what() << std::endl;
failure(msg);
// eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " end failure" << std::endl;

}
}

// eckit::Log::info() << eckit::TimeStamp() << " " << transport_.title() << " exiting " << std::endl;
transport_.synchronise();

}

void Consumer::shutdown(Message &message) {
message << "OK";
}

void Consumer::failure(Message &message) {

}

//----------------------------------------------------------------------------------------------------------------------

} // namespace eckit

47 changes: 47 additions & 0 deletions src/eckit/distributed/Consumer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

/// @file Consumer.h
/// @author Baudouin Raoult
/// @author Tiago Quintino
/// @date May 2016

#ifndef eckit_Consumer_H
#define eckit_Consumer_H

#include "eckit/distributed/Actor.h"

namespace eckit::distributed {

//----------------------------------------------------------------------------------------------------------------------

class Consumer : public Actor {

virtual void run();

virtual void getNextMessage(Message& message) const = 0;


public: // methods

Consumer(Transport &transport);


virtual void consume(Message& message) = 0;
virtual void failure(Message& message);
virtual void shutdown(Message& message);

};

//----------------------------------------------------------------------------------------------------------------------

} // namespace eckit

#endif
Loading

0 comments on commit f0a2844

Please sign in to comment.