-
Notifications
You must be signed in to change notification settings - Fork 12
/
vlsv_reader_parallel.h
133 lines (114 loc) · 6.54 KB
/
vlsv_reader_parallel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/** This file is part of VLSV file format.
*
* Copyright 2011-2016 Finnish Meteorological Institute
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef VLSV_READER_PARALLEL_H
#define VLSV_READER_PARALLEL_H
#include <mpi.h>
#include "vlsv_reader.h"
#include "mpiconversion.h"
#include "multi_io_unit.h"
namespace vlsv {
class ParallelReader: public Reader {
public:
ParallelReader();
~ParallelReader();
bool close();
bool getArrayAttributes(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribsIn,
std::map<std::string,std::string>& attribsOut) const;
bool getArrayInfo(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
uint64_t& arraySize,uint64_t& vectorSize,datatype::type& dataType,uint64_t& byteSize);
bool getArrayInfoMaster(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
uint64_t& arraySize,uint64_t& vectorSize,datatype::type& dataType,uint64_t& dataSize);
uint64_t getBytesRead();
double getReadTime() const;
bool getUniqueAttributeValues(const std::string& tagName,const std::string& attribName,std::set<std::string>& output) const;
bool open(const std::string& fname,MPI_Comm comm,const int& masterRank,MPI_Info mpiInfo=MPI_INFO_NULL);
bool readArrayMaster(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
const uint64_t& begin,const uint64_t& amount,char* buffer);
using Reader::readArray;
bool readArray(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
const uint64_t& begin,const uint64_t& amount,char* buffer);
bool addMultireadUnit(char* buffer,const uint64_t& amount,const uint64_t& offset=0);
bool endMultiread(const uint64_t& arrayOffset, const bool provideManualOffsets=false);
bool startMultiread(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs);
template<typename T>
bool read(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
const uint64_t& begin,const uint64_t& amount,T*& buffer,bool allocateMemory=true);
template<typename T>
bool readParameter(const std::string& parameterName,T& value);
private:
uint64_t bytesRead; /**< Number of bytes read by this process.*/
MPI_Comm comm; /**< MPI communicator used to read the file.*/
MPI_File filePtr; /**< MPI file pointer to input file.*/
int masterRank; /**< MPI rank of master process.*/
bool multireadStarted; /**< If true, multiread mode has been initialized successfully.*/
int myRank; /**< MPI rank of this process in communicator comm.*/
bool parallelFileOpen; /**< If true, all processes have opened input file successfully.*/
int processes; /**< Number of MPI processes in communicator comm.*/
double readTime; /**< Time spent in seconds to read bytesRead bytes by this process.*/
std::list<Multi_IO_Unit> multiReadUnits;
bool getArrayInfo(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs);
bool flushMultiread(const size_t& unit,const MPI_Offset& currentOffset,std::list<Multi_IO_Unit>::iterator& start,std::list<Multi_IO_Unit>::iterator& stop);
};
template<typename T>
bool ParallelReader::read(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
const uint64_t& begin,const uint64_t& amount,T*& outBuffer,bool allocateMemory) {
// Get array info to all processes:
if (ParallelReader::getArrayInfo(tagName,attribs) == false) {
return false;
}
// Check that requested read is inside the array:
if (begin > arrayOpen.arraySize || (begin+amount) > arrayOpen.arraySize) return false;
char* buffer = new char[amount*arrayOpen.vectorSize*arrayOpen.dataSize];
if (ParallelReader::readArray(tagName,attribs,begin,amount,buffer) == false) {
delete [] buffer; buffer = NULL;
return false;
}
// Copy data from temporary buffer to output array:
if (allocateMemory == true) outBuffer = new T[amount*arrayOpen.vectorSize];
char* ptr = buffer;
for (uint64_t i=0; i<amount; ++i) {
for (uint64_t j=0; j<arrayOpen.vectorSize; ++j) {
convertValue<T>(outBuffer[i*arrayOpen.vectorSize+j],ptr,arrayOpen.dataType,arrayOpen.dataSize,false);
ptr += arrayOpen.dataSize;
}
}
delete [] buffer; buffer = NULL;
return true;
}
/** Read the value of a parameter. All processes must call this function simultaneously.
* @param parameterName Name of the parameter. Only significant on master process.
* @param value Variable where parameter's value will be written. Will be the same value on all processes upon successful exit.
* @return If true, parameter was successfully read. All processes return the same value.*/
template<typename T> inline
bool ParallelReader::readParameter(const std::string& parameterName,T& value) {
bool success = true;
// Master process reads parameter value:
if (myRank == masterRank) {
if (Reader::readParameter(parameterName,value) == false) success = false;
}
// Master process broadcasts parameter value and
// value of variable 'status' to all other processes:
uint8_t masterSuccess = 0;
if (success == true) masterSuccess = 1;
MPI_Bcast(&value,1,MPI_Type<T>(),masterRank,comm);
MPI_Bcast(&masterSuccess,1,MPI_Type<uint8_t>(),masterRank,comm);
success = (masterSuccess > 0);
return success;
}
}
#endif