Example on reading several slices in parallel with a CDMReader.
#include <boost/shared_array.hpp>
#include <boost/interprocess/anonymous_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <unistd.h>
#include <wait.h>
DataPtr getParallelScaledDataSliceInUnit(
size_t maxProcs, boost::shared_ptr<CDMReader> reader,
const string& parName,
const string& parUnit,
const vector<SliceBuilder>& slices)
{
for (
int i = 0; i < slices.
size(); i++) {
}
size_t total =
accumulate(sliceLengths.begin(), sliceLengths.end(), 0);
boost::interprocess::mapped_region region(boost::interprocess::anonymous_shared_memory(total*sizeof(float)));
pid_t pid;
for (size_t i = 0; i < maxProcs; i++) {
pid = fork();
if(pid < 0) {
printf("Error forking");
exit(1);
} else if (pid == 0) {
assert(region.get_size() == (total*sizeof(float)));
float* regionFloat = reinterpret_cast<float*>(region.get_address());
size_t startPos = 0;
for (
size_t j = 0; j < slices.
size(); j++) {
if ((j % maxProcs) == i) {
try {
data = reader->getScaledDataSliceInUnit(parName, parUnit, slices.
at(j));
cerr <<
"error fetching data on '" << parName <<
"', '" << parUnit <<
"' slice " << j <<
": " << ex.
what() <<
endl;
}
boost::shared_array<float> array;
if (data->size() == 0) {
array = boost::shared_array<float>(new float[sliceLengths.at(j)]);
} else {
assert(data->size() == sliceLengths.at(j));
array = data->asFloat();
}
std::copy(array.get(), array.get()+sliceLengths.at(j), regionFloat + startPos);
}
startPos += sliceLengths.at(j);
}
_exit(0);
} else {
}
}
for (int i = 0; i < maxProcs; ++i) {
int status;
while (-1 == waitpid(children.
at(i), &status, 0));
if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
throw runtime_error(
"child-process did not finish correctly when fetching data");
exit(1);
}
}
boost::shared_array<float> allFloats(new float[total]);
assert(region.get_size() == (total*sizeof(float)));
float* regionFloat = reinterpret_cast<float*>(region.get_address());
std::copy(regionFloat, regionFloat+total, allFloats.get());
}
int main(int argc, char* args[]) {
const CDM& cdm = reader->getCDM();
string parName = "air_temperature_2m";
string parUnit = "degC";
sb.setStartAndSize("y", 1, 500);
for (size_t i = 0; i < maxSize; i++) {
sb.setStartAndSize("time", i, 1);
}
size_t maxProcs = 0;
if (argc > 1) {
maxProcs = string2type<size_t>(args[1]);
}
if (maxProcs <= 0) {
#ifdef _SC_NPROCESSORS_ONLN
maxProcs = sysconf( _SC_NPROCESSORS_ONLN );
#endif
if (maxProcs == 0) maxProcs = 1;
}
printf("maxProcs = %d\n", maxProcs);
data = getParallelScaledDataSliceInUnit(maxProcs, reader, parName, parUnit, slices);
boost::shared_array<float> arrayP = data->asFloat();
for (size_t i = 0; i < data->size(); i++) {
}
}