forked from fmihpc/vlsv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vlsv_reader_parallel.cpp
541 lines (463 loc) · 23.4 KB
/
vlsv_reader_parallel.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
/** This file is part of VLSV file format.
*
* Copyright 2011-2013,2015 Finnish Meteorological Institute
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <cstdlib>
#include <iostream>
#include <string.h>
#include "vlsv_common_mpi.h"
#include "vlsv_reader_parallel.h"
using namespace std;
namespace vlsv {
/** Default constructor for class ParallelReader.*/
ParallelReader::ParallelReader(): Reader() {
multireadStarted = false;
}
/** Destructor for class ParallelReader. It closes the input file (if still open).*/
ParallelReader::~ParallelReader() {
close();
}
/** Add a multi-read unit. Note that startMmultiread must have been called
* to initialize multi-read mode before calling this function. The data is not
* actually read until endMultiread is called.
* @param buffer Pointer to allocated memory location where data from file is placed.
* @param arrayElements Number of array elements to read.
* @return If true, multiread unit was added successfully.
* @see startMultiread.
* @see endMultiread.*/
bool ParallelReader::addMultireadUnit(char* buffer,const uint64_t& arrayElements) {
bool success = true;
if (multireadStarted == false) return false;
// Ignore zero size reads:
if (arrayElements == 0) return true;
// Get the byte size of the MPI primitive datatype (MPI_INT etc.) used here:
int datatypeBytesize;
MPI_Type_size(getMPIDatatype(arrayOpen.dataType,arrayOpen.dataSize),&datatypeBytesize);
// Calculate the maximum number of array elements written using a single multi-write.
// Note: element = vector of size vectorSize, each vector element has byte size of datatypeBytesize.
const size_t maxElementsPerRead = getMaxBytesPerRead() / (datatypeBytesize*arrayOpen.vectorSize);
// Split the multi-read if the array has more elements than what we can
// read from output file using a single MPI collective:
if (arrayElements > maxElementsPerRead) {
// Calculate how many collectives this process needs:
size_t N_reads = arrayElements / maxElementsPerRead;
if (arrayElements % maxElementsPerRead != 0) ++N_reads;
// Add N_reads multi-read units:
for (size_t i=0; i<N_reads; ++i) {
size_t elements = maxElementsPerRead;
if ((i+1)*maxElementsPerRead >= arrayElements) elements = arrayElements - i*maxElementsPerRead;
const size_t byteOffset = maxElementsPerRead*arrayOpen.vectorSize*datatypeBytesize;
multiReadUnits.push_back(
Multi_IO_Unit(buffer+i*byteOffset,
getMPIDatatype(arrayOpen.dataType,arrayOpen.dataSize),
elements*arrayOpen.vectorSize));
}
} else {
multiReadUnits.push_back(
Multi_IO_Unit(buffer,
getMPIDatatype(arrayOpen.dataType,arrayOpen.dataSize),
arrayElements*arrayOpen.vectorSize));
}
return success;
}
/** End multi-read mode and read data from file on all processes.
* Note that startMultiread and one or more addMultireadUnit calls must have
* been made before calling this function.
* @param arrayOffset Offset into input array relative to array start on file.
* Determines where this process starts to read its data. Offset is given in
* units of array elements.
* @return If true, all processes read their data successfully.
* @see startMultiread.
* @see addMultireadUnit.*/
bool ParallelReader::endMultiread(const uint64_t& arrayOffset) {
bool success = true;
if (multireadStarted == false) success = false;
if (checkSuccess(success,comm) == false) return false;
// Calculate how many collective MPI calls are needed to
// read all the data from input file:
size_t inputBytesize = 0;
size_t myCollectiveCalls = 0;
if (multiReadUnits.size() > 0) myCollectiveCalls = 1;
vector<pair<list<Multi_IO_Unit>::iterator,list<Multi_IO_Unit>::iterator> > multireadList;
list<Multi_IO_Unit>::iterator first = multiReadUnits.begin();
list<Multi_IO_Unit>::iterator last = multiReadUnits.begin();
for (list<Multi_IO_Unit>::iterator it=multiReadUnits.begin(); it!=multiReadUnits.end(); ++it) {
if (inputBytesize + it->amount*arrayOpen.dataSize > getMaxBytesPerRead()) {
multireadList.push_back(make_pair(first,last));
first = it; last = it;
inputBytesize = 0;
++myCollectiveCalls;
}
inputBytesize += it->amount*arrayOpen.dataSize;
++last;
}
multireadList.push_back(make_pair(first,last));
// Reduce the maximum number of needed collective reads to all processes:
size_t N_collectiveCalls;
MPI_Allreduce(&myCollectiveCalls,&N_collectiveCalls,1,MPI_Type<size_t>(),MPI_MAX,comm);
// If more collective calls are made than what this process needs,
// insert dummy reads to the end of multireadList:
if (N_collectiveCalls > multireadList.size()) {
const size_t N_dummyCalls = N_collectiveCalls-multireadList.size();
for (size_t i=0; i<N_dummyCalls; ++i) {
multireadList.push_back(make_pair(multiReadUnits.end(),multiReadUnits.end()));
}
}
MPI_Offset unitOffset = 0;
unitOffset = arrayOpen.offset; // Offset of array start relative to file start
unitOffset += arrayOffset*arrayOpen.vectorSize*arrayOpen.dataSize; // Byte offset relative to array start where
// this process starts to read data from.
for (size_t i=0; i<multireadList.size(); ++i) {
if (flushMultiread(i,unitOffset,multireadList[i].first,multireadList[i].second) == false) success = false;
for (std::list<Multi_IO_Unit>::iterator it=multireadList[i].first; it!=multireadList[i].second; ++it) {
unitOffset += it->amount*arrayOpen.dataSize;
}
}
multireadStarted = false;
return checkSuccess(success,comm);
}
/** Close the input file.
* @return If true, the input file was closed successfully.
* @see vlsv::ParallelReader::open().*/
bool ParallelReader::close() {
multireadStarted = false;
if (parallelFileOpen == true) {
MPI_File_close(&filePtr);
parallelFileOpen = false;
}
if (myRank == masterRank) filein.close();
return true;
}
/** Get the XML attributes for the given array on all processes. The file
* footer is read by master process only, who then broadcasts the contents
* to all processes. This function returns the same value on all processes.
* @param tagName Name of the array's XML tag.
* @param attribsIn XML tag attributes that uniquely define the array.
* @param attribsOut XML tag attributes read from the input file.
* @return If true, array attributes were read successfully.*/
bool ParallelReader::getArrayAttributes(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribsIn,
std::map<std::string,std::string>& attribsOut) const {
bool success = true;
// Master process reads footer:
if (myRank == masterRank) {
success = Reader::getArrayAttributes(tagName,attribsIn,attribsOut);
}
// Check that master process read array attributes correctly:
int globalSuccess = 0;
if (success == true) globalSuccess = 1;
MPI_Bcast(&globalSuccess,1,MPI_Type<int>(),masterRank,comm);
if (globalSuccess == 0) return false;
// Master process distributes contents of map attribsOut to all processes:
// Broadcast number of entries in attribsOut:
size_t N_attribs = 0;
if (myRank == masterRank) N_attribs = attribsOut.size();
MPI_Bcast(&N_attribs,1,MPI_Type<size_t>(),masterRank,comm);
const unsigned int maxLength = 512;
char attribName[maxLength];
char attribValue[maxLength];
if (myRank == masterRank) {
for (map<string,string>::const_iterator it=attribsOut.begin(); it!=attribsOut.end(); ++it) {
strncpy(attribName,it->first.c_str(),maxLength-1);
strncpy(attribValue,it->second.c_str(),maxLength-1);
MPI_Bcast(attribName,maxLength,MPI_Type<char>(),masterRank,comm);
MPI_Bcast(attribValue,maxLength,MPI_Type<char>(),masterRank,comm);
}
} else {
for (size_t i=0; i<N_attribs; ++i) {
MPI_Bcast(attribName,maxLength,MPI_Type<char>(),masterRank,comm);
MPI_Bcast(attribValue,maxLength,MPI_Type<char>(),masterRank,comm);
attribsOut[attribName] = attribValue;
}
}
return success;
}
bool ParallelReader::getArrayInfoMaster(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
uint64_t& arraySize,uint64_t& vectorSize,datatype::type& dataType,uint64_t& dataSize) {
if (myRank != masterRank) {
cerr << "(PARALLEL READER): getArrayInfoMaster called on process #" << myRank << endl;
exit(1);
}
return Reader::getArrayInfo(tagName,attribs,arraySize,vectorSize,dataType,dataSize);
}
/** Read array metadata to all processes.
* @param tagName Name of the XML tag corresponding to the array.
* @param attribs A list where the array attribute,value pairs are copied.
* @return If true, array metadata was read successfully.*/
bool ParallelReader::getArrayInfo(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs) {
bool success = true;
if (myRank == masterRank) {
success = Reader::loadArray(tagName,attribs);
}
// Check that master read array info correctly:
int globalSuccess = 0;
if (success == true) globalSuccess = 1;
MPI_Bcast(&globalSuccess,1,MPI_Type<int>(),masterRank,comm);
if (globalSuccess == 0) success = false;
if (success == false) return false;
// Master broadcasts array info to all processes:
MPI_Bcast(&arrayOpen.offset, 1,MPI_Type<streamoff>(),masterRank,comm);
MPI_Bcast(&arrayOpen.arraySize, 1,MPI_Type<uint64_t>(), masterRank,comm);
MPI_Bcast(&arrayOpen.vectorSize,1,MPI_Type<uint64_t>(), masterRank,comm);
MPI_Bcast(&arrayOpen.dataType, 1,MPI_Type<int>(), masterRank,comm);
MPI_Bcast(&arrayOpen.dataSize, 1,MPI_Type<uint64_t>(), masterRank,comm);
return success;
}
bool ParallelReader::getArrayInfo(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
uint64_t& arraySize,uint64_t& vectorSize,datatype::type& dataType,uint64_t& byteSize) {
if (getArrayInfo(tagName,attribs) == false) return false;
// Copy values to output variables:
arraySize = arrayOpen.arraySize;
vectorSize = arrayOpen.vectorSize;
dataType = arrayOpen.dataType;
byteSize = arrayOpen.dataSize;
return true;
}
/** Get the amount of bytes read from input file so far. On master process this
* function returns the total number of bytes read by all processes. On other
* processes the returned value is the number of bytes read by that process.
* This function must be called simultaneously by all processes in the communicator.
* @return Number of bytes read.*/
uint64_t ParallelReader::getBytesRead() {
uint64_t totalBytesRead;
MPI_Reduce(&bytesRead,&totalBytesRead,1,MPI_Type<uint64_t>(),MPI_SUM,masterRank,comm);
return totalBytesRead;
}
/** Get the time spent by this process in file I/O. The approximate data rate is
* getBytesRead() / getReadTime() when calculated on master process in the communicator
* used to read the file.
* @return Time in seconds spent in file I/O.*/
double ParallelReader::getReadTime() const {
return readTime;
}
/** Get unique XML attribute values for given tag name. This function
* must be called by all processes simultaneously.
* @param tagName Name of the XML tag.
* @param attribName Name of the attribute.
* @param output Unique attribute values are inserted here.
* @return If true, attribute values were read successfully.*/
bool ParallelReader::getUniqueAttributeValues(const std::string& tagName,const std::string& attribName,
std::set<std::string>& output) const {
bool success = true;
// First the master process reads unique attribute values:
if (myRank == masterRank) {
success = Reader::getUniqueAttributeValues(tagName,attribName,output);
}
// Check that master read array info correctly:
uint8_t masterSuccess = 0;
if (success == true) masterSuccess = 1;
MPI_Bcast(&masterSuccess,1,MPI_Type<uint8_t>(),masterRank,comm);
if (masterSuccess == 0) success = false;
if (success == false) return false;
// Master broadcasts number of entries in set 'output':
size_t N_entries = output.size();
MPI_Bcast(&N_entries,1,MPI_Type<size_t>(),masterRank,comm);
// Master broadcasts values in set 'output' to all other processes
// who insert the received values to set 'output':
const unsigned int maxLength = 512;
char attribValue[maxLength];
if (myRank == masterRank) {
for (set<string>::const_iterator it=output.begin(); it!=output.end(); ++it) {
strncpy(attribValue,it->c_str(),maxLength-1);
MPI_Bcast(attribValue,maxLength,MPI_Type<char>(),masterRank,comm);
}
} else {
for (size_t i=0; i<N_entries; ++i) {
MPI_Bcast(attribValue,maxLength,MPI_Type<char>(),masterRank,comm);
output.insert(attribValue);
}
}
return success;
}
bool ParallelReader::flushMultiread(const size_t& unit,const MPI_Offset& fileOffset,
std::list<Multi_IO_Unit>::iterator& start,std::list<Multi_IO_Unit>::iterator& stop) {
bool success = true;
// Count the number of multi-read units read:
size_t N_multiReadUnits = 0;
for (list<Multi_IO_Unit>::iterator it=start; it!=stop; ++it) {
++N_multiReadUnits;
}
// Create an MPI datatype for reading all units with a single collective call:
int* blockLengths = new int[N_multiReadUnits];
MPI_Aint* displacements = new MPI_Aint[N_multiReadUnits];
MPI_Datatype* datatypes = new MPI_Datatype[N_multiReadUnits];
char* multireadOffsetPointer = NULL;
if (N_multiReadUnits > 0) multireadOffsetPointer = start->array;
// Copy pointers etc. to MPI struct:
size_t i=0;
size_t amount = 0;
for (list<Multi_IO_Unit>::iterator it=start; it!=stop; ++it) {
blockLengths[i] = it->amount;
displacements[i] = it->array - multireadOffsetPointer;
datatypes[i] = it->mpiType;
int datatypeBytesize;
MPI_Type_size(it->mpiType,&datatypeBytesize);
amount += it->amount * datatypeBytesize;
++i;
}
// Read data from file:
if (N_multiReadUnits > 0) {
// Create an MPI struct containing the multiread units:
MPI_Datatype inputType;
MPI_Type_create_struct(N_multiReadUnits,blockLengths,displacements,datatypes,&inputType);
MPI_Type_commit(&inputType);
// Read data from output file with a single collective call:
const double t_start = MPI_Wtime();
MPI_File_read_at_all(filePtr,fileOffset,multireadOffsetPointer,1,inputType,MPI_STATUS_IGNORE);
readTime += (MPI_Wtime() - t_start);
MPI_Type_free(&inputType);
bytesRead += amount;
} else {
// Process has no data to read but needs to participate in the collective call to prevent deadlock:
const double t_start = MPI_Wtime();
MPI_File_read_at_all(filePtr,fileOffset,NULL,0,MPI_BYTE,MPI_STATUS_IGNORE);
readTime += (MPI_Wtime() - t_start);
}
delete [] blockLengths;
delete [] displacements;
delete [] datatypes;
return success;
}
/** Open a VLSV file for parallel reading.
* @param fname Name of the VLSV file.
* @param comm MPI communicator used in collective MPI operations.
* @param masterRank MPI rank of master process.
* @param mpiInfo Additional MPI info for optimizing file I/O.
* @return If true, VLSV file was opened successfully.*/
bool ParallelReader::open(const std::string& fname,MPI_Comm comm,const int& masterRank,MPI_Info mpiInfo) {
bool success = true;
this->comm = comm;
this->masterRank = masterRank;
MPI_Comm_rank(comm,&myRank);
MPI_Comm_size(comm,&processes);
multireadStarted = false;
// Attempt to open the given input file using MPI:
fileName = fname;
int accessMode = MPI_MODE_RDONLY;
if (MPI_File_open(comm,const_cast<char*>(fileName.c_str()),accessMode,mpiInfo,&filePtr) != MPI_SUCCESS) success = false;
else parallelFileOpen = true;
if (success == false) cerr << "Failed to open parallel file" << endl;
// Only master process reads file footer and endianness. This is done using
// VLSVReader open() member function:
if (myRank == masterRank) {
if (Reader::open(fname) == false) success = false;
}
if (success == false) cerr << "MASTER failed to open VLSV file" << endl;
// Check that all processes have opened the file successfully:
unsigned char globalSuccess = 0;
if (success == true) globalSuccess = 1;
unsigned char* results = new unsigned char[processes];
MPI_Allgather(&globalSuccess,1,MPI_Type<unsigned char>(),results,1,MPI_Type<unsigned char>(),comm);
for (int i=0; i<processes; ++i) if (results[i] == 0) success = false;
delete [] results; results = NULL;
if (success == false) return success;
// Broadcast file endianness to all processes:
MPI_Bcast(&endiannessFile,1,MPI_Type<unsigned char>(),masterRank,comm);
bytesRead = 0;
return success;
}
bool ParallelReader::readArrayMaster(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
const uint64_t& begin,const uint64_t& amount,char* buffer) {
if (myRank != masterRank) {
cerr << "(PARALLEL READER) readArrayMaster erroneously called on process #" << myRank << endl;
exit(1);
}
// readArray reads offset from XML tree into master only
return Reader::readArray(tagName,attribs,begin,amount,buffer);
}
/** Read data from an array in VLSV file using collective MPI file I/O operations.
* XML tag name and contents of list 'attribs' need to uniquely define the array.
* @param tagName Array XML tag name.
* @param attribs Additional attributes limiting array search.
* @param begin First array element read, i.e. this process' offset into the array.
* @param amount Number of array elements to read.
* @param buffer Buffer in which data is read from VLSV file.
* @return If true, this process read its data successfully.*/
bool ParallelReader::readArray(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs,
const uint64_t& begin,const uint64_t& amount,char* buffer) {
bool success = true;
// Fetch array info to all processes:
if (getArrayInfo(tagName,attribs) == false) return false;
const MPI_Offset start = arrayOpen.offset + begin*arrayOpen.vectorSize*arrayOpen.dataSize;
const size_t readBytes = amount*arrayOpen.vectorSize*arrayOpen.dataSize;
// If readBytes is larger than getMaxBytesPerRead() this process needs
// more than one collective call to read in all the data.
const size_t maxBytes = getMaxBytesPerRead();
size_t myExtraCollectiveReads = readBytes / maxBytes;
// There's always at least one read:
++myExtraCollectiveReads;
// Reduce the max number of required collective calls to all
// processes to prevent deadlock:
size_t globalExtraCollectiveReads;
MPI_Allreduce(&myExtraCollectiveReads,&globalExtraCollectiveReads,1,MPI_Type<size_t>(),MPI_MAX,comm);
// Read data:
const double t_start = MPI_Wtime();
size_t offset = 0;
for (size_t counter=0; counter<globalExtraCollectiveReads; ++counter) {
char* pos;
size_t readSize;
if (counter < (myExtraCollectiveReads-1)) {
pos = buffer + counter*maxBytes;
readSize = maxBytes;
} else if (counter == (myExtraCollectiveReads-1)) {
// This is the last read operation for this process
pos = buffer + counter*maxBytes;
readSize = readBytes - counter*maxBytes;
} else {
// Nothing left to read
pos = NULL;
readSize = 0;
}
MPI_Status status;
if (MPI_File_read_at_all(filePtr,start+counter*maxBytes,pos,readSize,MPI_BYTE,&status) != MPI_SUCCESS) {
success = false;
}
// Check that we got everything we requested:
int bytesReceived;
MPI_Get_count(&status,MPI_BYTE,&bytesReceived);
if (bytesReceived != readSize) {
stringstream ss;
ss << "ERROR in vlsv::ParallelReader! I only got " << bytesReceived << "/" << readSize;
ss << " bytes in " << __FILE__ << ":" << __LINE__ << endl;
cerr << ss.str();
success = false;
}
offset += readSize;
}
readTime += (MPI_Wtime() - t_start);
bytesRead += amount*arrayOpen.vectorSize*arrayOpen.dataSize;
return checkSuccess(success,comm);
}
/** Start multi-read mode. In multi-read mode processes add zero or more file I/O units
* that define the data that is read from an array in VLSV file, and where it is placed in memory.
* File I/O units are defined by calling addMultireadUnit. Data is not actually read until
* endMultiread is called. XML tag name and contents of list 'attribs' need to uniquely
* define the array.
* @param tagName Array XML tag name in VLSV file.
* @param attribs Additional attributes that uniquely define the array in file.
* @return If true, multi-read mode was started successfully.
* @see addMultireadUnit.
* @see endMultiread.*/
bool ParallelReader::startMultiread(const std::string& tagName,const std::list<std::pair<std::string,std::string> >& attribs) {
if (parallelFileOpen == false) return false;
bool success = true;
multiReadUnits.clear();
if (getArrayInfo(tagName,attribs) == false) {
return false;
}
if (success == true) multireadStarted = true;
return success;
}
} // namespace vlsv