IOR
ior.c
Go to the documentation of this file.
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  */
4 /******************************************************************************\
5 * *
6 * Copyright (c) 2003, The Regents of the University of California *
7 * See the file COPYRIGHT for a complete copyright notice and license. *
8 * *
9 \******************************************************************************/
10 
11 #ifdef HAVE_CONFIG_H
12 # include "config.h"
13 #endif
14 
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <ctype.h> /* tolower() */
19 #include <errno.h>
20 #include <math.h>
21 #include <mpi.h>
22 #include <string.h>
23 
24 #if defined(HAVE_STRINGS_H)
25 #include <strings.h>
26 #endif
27 
28 #include <sys/stat.h> /* struct stat */
29 #include <time.h>
30 
31 #ifndef _WIN32
32 # include <sys/time.h> /* gettimeofday() */
33 # include <sys/utsname.h> /* uname() */
34 #endif
35 
36 #include <assert.h>
37 
38 #include "ior.h"
39 #include "ior-internal.h"
40 #include "aiori.h"
41 #include "utilities.h"
42 #include "parse_options.h"
43 
44 #define IOR_NB_TIMERS 6
45 
46 /* file scope globals */
47 extern char **environ;
48 static int totalErrorCount;
49 static const ior_aiori_t *backend;
50 
51 static void DestroyTests(IOR_test_t *tests_head);
52 static char *PrependDir(IOR_param_t *, char *);
53 static char **ParseFileName(char *, int *);
54 static void InitTests(IOR_test_t * , MPI_Comm);
55 static void TestIoSys(IOR_test_t *);
56 static void ValidateTests(IOR_param_t *);
58  void *fd, const int access,
59  IOR_io_buffers *ioBuffers);
60 
61 IOR_test_t * ior_run(int argc, char **argv, MPI_Comm world_com, FILE * world_out){
62  IOR_test_t *tests_head;
63  IOR_test_t *tptr;
64  out_logfile = world_out;
65  out_resultfile = world_out;
66  mpi_comm_world = world_com;
67 
68  MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
69 
70  /* setup tests, and validate parameters */
71  tests_head = ParseCommandLine(argc, argv);
72  InitTests(tests_head, world_com);
73  verbose = tests_head->params.verbose;
74 
75  PrintHeader(argc, argv);
76 
77  /* perform each test */
78  for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
79  aiori_initialize(tptr);
80  totalErrorCount = 0;
81  verbose = tptr->params.verbose;
82  backend = tptr->params.backend;
83  if (rank == 0 && verbose >= VERBOSE_0) {
84  ShowTestStart(&tptr->params);
85  }
86  TestIoSys(tptr);
88  ShowTestEnd(tptr);
89  aiori_finalize(tptr);
90  }
91 
92  PrintLongSummaryAllTests(tests_head);
93 
94  /* display finish time */
95  PrintTestEnds();
96  return tests_head;
97 }
98 
99 
100 
101 int ior_main(int argc, char **argv)
102 {
103  IOR_test_t *tests_head;
104  IOR_test_t *tptr;
105 
106  out_logfile = stdout;
107  out_resultfile = stdout;
108 
109  /*
110  * check -h option from commandline without starting MPI;
111  */
112  tests_head = ParseCommandLine(argc, argv);
113 
114  /* start the MPI code */
115  MPI_CHECK(MPI_Init(&argc, &argv), "cannot initialize MPI");
116 
117  mpi_comm_world = MPI_COMM_WORLD;
118  MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
119 
120  /* set error-handling */
121  /*MPI_CHECK(MPI_Errhandler_set(mpi_comm_world, MPI_ERRORS_RETURN),
122  "cannot set errhandler"); */
123 
124  /* setup tests, and validate parameters */
125  InitTests(tests_head, mpi_comm_world);
126  verbose = tests_head->params.verbose;
127 
128  aiori_initialize(tests_head); // this is quite suspicious, likely an error when multiple tests need to be executed with different backends and options
129 
130  PrintHeader(argc, argv);
131 
132  /* perform each test */
133  for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
134  verbose = tptr->params.verbose;
135  backend = tptr->params.backend;
136  if (rank == 0 && verbose >= VERBOSE_0) {
137  backend = tptr->params.backend;
138  ShowTestStart(&tptr->params);
139  }
140 
141  // This is useful for trapping a running MPI process. While
142  // this is sleeping, run the script 'testing/hdfs/gdb.attach'
143  if (verbose >= VERBOSE_4) {
144  fprintf(out_logfile, "\trank %d: sleeping\n", rank);
145  sleep(5);
146  fprintf(out_logfile, "\trank %d: awake.\n", rank);
147  }
148 
149  TestIoSys(tptr);
150  ShowTestEnd(tptr);
151  }
152 
153  if (verbose < 0)
154  /* always print final summary */
155  verbose = 0;
156  PrintLongSummaryAllTests(tests_head);
157 
158  /* display finish time */
159  PrintTestEnds();
160 
161  aiori_finalize(tests_head);
162 
163  MPI_CHECK(MPI_Finalize(), "cannot finalize MPI");
164 
165  DestroyTests(tests_head);
166 
167  return totalErrorCount;
168 }
169 
170 /***************************** F U N C T I O N S ******************************/
171 
172 /*
173  * Initialize an IOR_param_t structure to the defaults
174  */
176 {
177  const char *default_aiori = aiori_default ();
178  char *hdfs_user;
179 
180  assert (NULL != default_aiori);
181 
182  memset(p, 0, sizeof(IOR_param_t));
183 
186 
187  p->api = strdup(default_aiori);
188  p->platform = strdup("HOST(OSTYPE)");
189  p->testFileName = strdup("testFile");
190 
191  p->writeFile = p->readFile = FALSE;
192  p->checkWrite = p->checkRead = FALSE;
193 
194  /*
195  * These can be overridden from the command-line but otherwise will be
196  * set from MPI.
197  */
198  p->numTasks = -1;
199  p->numNodes = -1;
200  p->numTasksOnNode0 = -1;
201 
202  p->repetitions = 1;
203  p->repCounter = -1;
204  p->open = WRITE;
205  p->taskPerNodeOffset = 1;
206  p->segmentCount = 1;
207  p->blockSize = 1048576;
208  p->transferSize = 262144;
209  p->randomSeed = -1;
210  p->incompressibleSeed = 573;
212  p->setAlignment = 1;
213  p->lustre_start_ost = -1;
214 
215  hdfs_user = getenv("USER");
216  if (!hdfs_user)
217  hdfs_user = "";
218  p->hdfs_user = strdup(hdfs_user);
219  p->hdfs_name_node = "default";
220  p->hdfs_name_node_port = 0; /* ??? */
221  p->hdfs_fs = NULL;
222  p->hdfs_replicas = 0; /* invokes the default */
223  p->hdfs_block_size = 0;
224 
225  p->URI = NULL;
226  p->part_number = 0;
227 
228  p->beegfs_numTargets = -1;
229  p->beegfs_chunkSize = -1;
230 }
231 
232 static void
234  double timerVal,
235  char *timeString, int access, int outlierThreshold)
236 {
237  char accessString[MAX_STR];
238  double sum, mean, sqrDiff, var, sd;
239 
240  /* for local timerVal, don't compensate for wall clock delta */
241  timerVal += wall_clock_delta;
242 
243  MPI_CHECK(MPI_Allreduce
244  (&timerVal, &sum, 1, MPI_DOUBLE, MPI_SUM, testComm),
245  "MPI_Allreduce()");
246  mean = sum / numTasks;
247  sqrDiff = pow((mean - timerVal), 2);
248  MPI_CHECK(MPI_Allreduce
249  (&sqrDiff, &var, 1, MPI_DOUBLE, MPI_SUM, testComm),
250  "MPI_Allreduce()");
251  var = var / numTasks;
252  sd = sqrt(var);
253 
254  if (access == WRITE) {
255  strcpy(accessString, "write");
256  } else { /* READ */
257  strcpy(accessString, "read");
258  }
259  if (fabs(timerVal - mean) > (double)outlierThreshold) {
260  char hostname[MAX_STR];
261  int ret = gethostname(hostname, MAX_STR);
262  if (ret != 0)
263  strcpy(hostname, "unknown");
264 
265  fprintf(out_logfile, "WARNING: for %s, task %d, %s %s is %f\n",
266  hostname, rank, accessString, timeString, timerVal);
267  fprintf(out_logfile, " (mean=%f, stddev=%f)\n", mean, sd);
268  fflush(out_logfile);
269  }
270 }
271 
272 /*
273  * Check for outliers in start/end times and elapsed create/xfer/close times.
274  */
275 static void
276 CheckForOutliers(IOR_param_t *test, const double *timer, const int access)
277 {
278  DisplayOutliers(test->numTasks, timer[0],
279  "start time", access, test->outlierThreshold);
281  timer[1] - timer[0],
282  "elapsed create time", access, test->outlierThreshold);
284  timer[3] - timer[2],
285  "elapsed transfer time", access,
286  test->outlierThreshold);
288  timer[5] - timer[4],
289  "elapsed close time", access, test->outlierThreshold);
290  DisplayOutliers(test->numTasks, timer[5], "end time",
291  access, test->outlierThreshold);
292 }
293 
294 /*
295  * Check if actual file size equals expected size; if not use actual for
296  * calculating performance rate.
297  */
298 static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep,
299  const int access)
300 {
301  IOR_param_t *params = &test->params;
302  IOR_results_t *results = test->results;
303  IOR_point_t *point = (access == WRITE) ? &results[rep].write :
304  &results[rep].read;
305 
306  MPI_CHECK(MPI_Allreduce(&dataMoved, &point->aggFileSizeFromXfer,
307  1, MPI_LONG_LONG_INT, MPI_SUM, testComm),
308  "cannot total data moved");
309 
310  if (strcasecmp(params->api, "HDF5") != 0 && strcasecmp(params->api, "NCMPI") != 0) {
311  if (verbose >= VERBOSE_0 && rank == 0) {
312  if ((params->expectedAggFileSize
313  != point->aggFileSizeFromXfer)
314  || (point->aggFileSizeFromStat
315  != point->aggFileSizeFromXfer)) {
316  fprintf(out_logfile,
317  "WARNING: Expected aggregate file size = %lld.\n",
318  (long long) params->expectedAggFileSize);
319  fprintf(out_logfile,
320  "WARNING: Stat() of aggregate file size = %lld.\n",
321  (long long) point->aggFileSizeFromStat);
322  fprintf(out_logfile,
323  "WARNING: Using actual aggregate bytes moved = %lld.\n",
324  (long long) point->aggFileSizeFromXfer);
325  if(params->deadlineForStonewalling){
326  fprintf(out_logfile,
327  "WARNING: maybe caused by deadlineForStonewalling\n");
328  }
329  }
330  }
331  }
332 
333  point->aggFileSizeForBW = point->aggFileSizeFromXfer;
334 }
335 
336 /*
337  * Compare buffers after reading/writing each transfer. Displays only first
338  * difference in buffers and returns total errors counted.
339  */
340 static size_t
341 CompareBuffers(void *expectedBuffer,
342  void *unknownBuffer,
343  size_t size,
344  IOR_offset_t transferCount, IOR_param_t *test, int access)
345 {
346  char testFileName[MAX_PATHLEN];
347  char bufferLabel1[MAX_STR];
348  char bufferLabel2[MAX_STR];
349  size_t i, j, length, first, last;
350  size_t errorCount = 0;
351  int inError = 0;
352  unsigned long long *goodbuf = (unsigned long long *)expectedBuffer;
353  unsigned long long *testbuf = (unsigned long long *)unknownBuffer;
354 
355  if (access == WRITECHECK || access == READCHECK) {
356  strcpy(bufferLabel1, "Expected: ");
357  strcpy(bufferLabel2, "Actual: ");
358  } else {
359  ERR("incorrect argument for CompareBuffers()");
360  }
361 
362  length = size / sizeof(IOR_size_t);
363  first = -1;
364  if (verbose >= VERBOSE_3) {
365  fprintf(out_logfile,
366  "[%d] At file byte offset %lld, comparing %llu-byte transfer\n",
367  rank, test->offset, (long long)size);
368  }
369  for (i = 0; i < length; i++) {
370  if (testbuf[i] != goodbuf[i]) {
371  errorCount++;
372  if (verbose >= VERBOSE_2) {
373  fprintf(out_logfile,
374  "[%d] At transfer buffer #%lld, index #%lld (file byte offset %lld):\n",
375  rank, transferCount - 1, (long long)i,
376  test->offset +
377  (IOR_size_t) (i * sizeof(IOR_size_t)));
378  fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel1);
379  fprintf(out_logfile, "%016llx\n", goodbuf[i]);
380  fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel2);
381  fprintf(out_logfile, "%016llx\n", testbuf[i]);
382  }
383  if (!inError) {
384  inError = 1;
385  first = i;
386  last = i;
387  } else {
388  last = i;
389  }
390  } else if (verbose >= VERBOSE_5 && i % 4 == 0) {
391  fprintf(out_logfile,
392  "[%d] PASSED offset = %lld bytes, transfer %lld\n",
393  rank,
394  ((i * sizeof(unsigned long long)) +
395  test->offset), transferCount);
396  fprintf(out_logfile, "[%d] GOOD %s0x", rank, bufferLabel1);
397  for (j = 0; j < 4; j++)
398  fprintf(out_logfile, "%016llx ", goodbuf[i + j]);
399  fprintf(out_logfile, "\n[%d] GOOD %s0x", rank, bufferLabel2);
400  for (j = 0; j < 4; j++)
401  fprintf(out_logfile, "%016llx ", testbuf[i + j]);
402  fprintf(out_logfile, "\n");
403  }
404  }
405  if (inError) {
406  inError = 0;
407  GetTestFileName(testFileName, test);
408  fprintf(out_logfile,
409  "[%d] FAILED comparison of buffer containing %d-byte ints:\n",
410  rank, (int)sizeof(unsigned long long int));
411  fprintf(out_logfile, "[%d] File name = %s\n", rank, testFileName);
412  fprintf(out_logfile, "[%d] In transfer %lld, ", rank,
413  transferCount);
414  fprintf(out_logfile,
415  "%lld errors between buffer indices %lld and %lld.\n",
416  (long long)errorCount, (long long)first,
417  (long long)last);
418  fprintf(out_logfile, "[%d] File byte offset = %lld:\n", rank,
419  ((first * sizeof(unsigned long long)) + test->offset));
420 
421  fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel1);
422  for (j = first; j < length && j < first + 4; j++)
423  fprintf(out_logfile, "%016llx ", goodbuf[j]);
424  if (j == length)
425  fprintf(out_logfile, "[end of buffer]");
426  fprintf(out_logfile, "\n[%d] %s0x", rank, bufferLabel2);
427  for (j = first; j < length && j < first + 4; j++)
428  fprintf(out_logfile, "%016llx ", testbuf[j]);
429  if (j == length)
430  fprintf(out_logfile, "[end of buffer]");
431  fprintf(out_logfile, "\n");
432  if (test->quitOnError == TRUE)
433  ERR("data check error, aborting execution");
434  }
435  return (errorCount);
436 }
437 
438 /*
439  * Count all errors across all tasks; report errors found.
440  */
441 static int CountErrors(IOR_param_t * test, int access, int errors)
442 {
443  int allErrors = 0;
444 
445  if (test->checkWrite || test->checkRead) {
446  MPI_CHECK(MPI_Reduce(&errors, &allErrors, 1, MPI_INT, MPI_SUM,
447  0, testComm), "cannot reduce errors");
448  MPI_CHECK(MPI_Bcast(&allErrors, 1, MPI_INT, 0, testComm),
449  "cannot broadcast allErrors value");
450  if (allErrors != 0) {
451  totalErrorCount += allErrors;
452  test->errorFound = TRUE;
453  }
454  if (rank == 0 && allErrors != 0) {
455  if (allErrors < 0) {
456  WARN("overflow in errors counted");
457  allErrors = -1;
458  }
459  fprintf(out_logfile, "WARNING: incorrect data on %s (%d errors found).\n",
460  access == WRITECHECK ? "write" : "read", allErrors);
461  fprintf(out_logfile,
462  "Used Time Stamp %u (0x%x) for Data Signature\n",
465  }
466  }
467  return (allErrors);
468 }
469 
470 /*
471  * Allocate a page-aligned (required by O_DIRECT) buffer.
472  */
473 static void *aligned_buffer_alloc(size_t size)
474 {
475  size_t pageMask;
476  char *buf, *tmp;
477  char *aligned;
478 
479 #ifdef HAVE_SYSCONF
480  long pageSize = sysconf(_SC_PAGESIZE);
481 #else
482  size_t pageSize = getpagesize();
483 #endif
484 
485  pageMask = pageSize - 1;
486  buf = malloc(size + pageSize + sizeof(void *));
487  if (buf == NULL)
488  ERR("out of memory");
489  /* find the alinged buffer */
490  tmp = buf + sizeof(char *);
491  aligned = tmp + pageSize - ((size_t) tmp & pageMask);
492  /* write a pointer to the original malloc()ed buffer into the bytes
493  preceding "aligned", so that the aligned buffer can later be free()ed */
494  tmp = aligned - sizeof(void *);
495  *(void **)tmp = buf;
496 
497  return (void *)aligned;
498 }
499 
500 /*
501  * Free a buffer allocated by aligned_buffer_alloc().
502  */
503 static void aligned_buffer_free(void *buf)
504 {
505  free(*(void **)((char *)buf - sizeof(char *)));
506 }
507 
509 {
510  int reps;
511  if (test->results != NULL)
512  return;
513 
514  reps = test->params.repetitions;
515  test->results = (IOR_results_t *) safeMalloc(sizeof(IOR_results_t) * reps);
516 }
517 
519 {
520  if (test->results != NULL) {
521  free(test->results);
522  }
523 }
524 
525 
529 IOR_test_t *CreateTest(IOR_param_t *init_params, int test_num)
530 {
531  IOR_test_t *newTest = NULL;
532 
533  newTest = (IOR_test_t *) malloc(sizeof(IOR_test_t));
534  if (newTest == NULL)
535  ERR("malloc() of IOR_test_t failed");
536  newTest->params = *init_params;
537  newTest->params.platform = GetPlatformName();
538  newTest->params.id = test_num;
539  newTest->next = NULL;
540  newTest->results = NULL;
541 
542  return newTest;
543 }
544 
545 static void DestroyTest(IOR_test_t *test)
546 {
547  FreeResults(test);
548  free(test);
549 }
550 
551 static void DestroyTests(IOR_test_t *tests_head)
552 {
553  IOR_test_t *tptr, *next;
554 
555  for (tptr = tests_head; tptr != NULL; tptr = next) {
556  next = tptr->next;
557  DestroyTest(tptr);
558  }
559 }
560 
561 /*
562  * Distribute IOR_HINTs to all tasks' environments.
563  */
564 void DistributeHints(void)
565 {
566  char hint[MAX_HINTS][MAX_STR], fullHint[MAX_STR], hintVariable[MAX_STR];
567  int hintCount = 0, i;
568 
569  if (rank == 0) {
570  for (i = 0; environ[i] != NULL; i++) {
571  if (strncmp(environ[i], "IOR_HINT", strlen("IOR_HINT"))
572  == 0) {
573  hintCount++;
574  if (hintCount == MAX_HINTS) {
575  WARN("exceeded max hints; reset MAX_HINTS and recompile");
576  hintCount = MAX_HINTS;
577  break;
578  }
579  /* assume no IOR_HINT is greater than MAX_STR in length */
580  strncpy(hint[hintCount - 1], environ[i],
581  MAX_STR - 1);
582  }
583  }
584  }
585 
586  MPI_CHECK(MPI_Bcast(&hintCount, sizeof(hintCount), MPI_BYTE,
587  0, MPI_COMM_WORLD), "cannot broadcast hints");
588  for (i = 0; i < hintCount; i++) {
589  MPI_CHECK(MPI_Bcast(&hint[i], MAX_STR, MPI_BYTE,
590  0, MPI_COMM_WORLD),
591  "cannot broadcast hints");
592  strcpy(fullHint, hint[i]);
593  strcpy(hintVariable, strtok(fullHint, "="));
594  if (getenv(hintVariable) == NULL) {
595  /* doesn't exist in this task's environment; better set it */
596  if (putenv(hint[i]) != 0)
597  WARN("cannot set environment variable");
598  }
599  }
600 }
601 
602 /*
603  * Fill buffer, which is transfer size bytes long, with known 8-byte long long
604  * int values. In even-numbered 8-byte long long ints, store MPI task in high
605  * bits and timestamp signature in low bits. In odd-numbered 8-byte long long
606  * ints, store transfer offset. If storeFileOffset option is used, the file
607  * (not transfer) offset is stored instead.
608  */
609 
610 static void
612 
613 {
614  size_t i;
615  unsigned long long hi, lo;
616  unsigned long long *buf = (unsigned long long *)buffer;
617 
618  for (i = 0; i < test->transferSize / sizeof(unsigned long long); i++) {
619  hi = ((unsigned long long) rand_r(&test->incompressibleSeed) << 32);
620  lo = (unsigned long long) rand_r(&test->incompressibleSeed);
621  buf[i] = hi | lo;
622  }
623 }
624 
626 
627 static void
628 FillBuffer(void *buffer,
629  IOR_param_t * test, unsigned long long offset, int fillrank)
630 {
631  size_t i;
632  unsigned long long hi, lo;
633  unsigned long long *buf = (unsigned long long *)buffer;
634 
635  if(test->dataPacketType == incompressible ) { /* Make for some non compressable buffers with randomish data */
636 
637  /* In order for write checks to work, we have to restart the psuedo random sequence */
639  test->incompressibleSeed = test->setTimeStampSignature + rank; /* We copied seed into timestampSignature at initialization, also add the rank to add randomness between processes */
641  }
642  FillIncompressibleBuffer(buffer, test);
643  }
644 
645  else {
646  hi = ((unsigned long long)fillrank) << 32;
647  lo = (unsigned long long)test->timeStampSignatureValue;
648  for (i = 0; i < test->transferSize / sizeof(unsigned long long); i++) {
649  if ((i % 2) == 0) {
650  /* evens contain MPI rank and time in seconds */
651  buf[i] = hi | lo;
652  } else {
653  /* odds contain offset */
654  buf[i] = offset + (i * sizeof(unsigned long long));
655  }
656  }
657  }
658 }
659 
660 /*
661  * Return string describing machine name and type.
662  */
664 {
665  char nodeName[MAX_STR], *p, *start, sysName[MAX_STR];
666  char platformName[MAX_STR];
667  struct utsname name;
668 
669  if (uname(&name) != 0) {
670  EWARN("cannot get platform name");
671  sprintf(sysName, "%s", "Unknown");
672  sprintf(nodeName, "%s", "Unknown");
673  } else {
674  sprintf(sysName, "%s", name.sysname);
675  sprintf(nodeName, "%s", name.nodename);
676  }
677 
678  start = nodeName;
679  if (strlen(nodeName) == 0) {
680  p = start;
681  } else {
682  /* point to one character back from '\0' */
683  p = start + strlen(nodeName) - 1;
684  }
685  /*
686  * to cut off trailing node number, search backwards
687  * for the first non-numeric character
688  */
689  while (p != start) {
690  if (*p < '0' || *p > '9') {
691  *(p + 1) = '\0';
692  break;
693  } else {
694  p--;
695  }
696  }
697 
698  sprintf(platformName, "%s(%s)", nodeName, sysName);
699  return strdup(platformName);
700 }
701 
702 
703 
704 /*
705  * Parse file name.
706  */
707 static char **ParseFileName(char *name, int *count)
708 {
709  char **fileNames, *tmp, *token;
710  char delimiterString[3] = { FILENAME_DELIMITER, '\n', '\0' };
711  int i = 0;
712 
713  *count = 0;
714  tmp = name;
715 
716  /* pass one */
717  /* if something there, count the first item */
718  if (*tmp != '\0') {
719  (*count)++;
720  }
721  /* count the rest of the filenames */
722  while (*tmp != '\0') {
723  if (*tmp == FILENAME_DELIMITER) {
724  (*count)++;
725  }
726  tmp++;
727  }
728 
729  fileNames = (char **)malloc((*count) * sizeof(char **));
730  if (fileNames == NULL)
731  ERR("out of memory");
732 
733  /* pass two */
734  token = strtok(name, delimiterString);
735  while (token != NULL) {
736  fileNames[i] = token;
737  token = strtok(NULL, delimiterString);
738  i++;
739  }
740  return (fileNames);
741 }
742 
743 
744 /*
745  * Return test file name to access.
746  * for single shared file, fileNames[0] is returned in testFileName
747  */
748 void GetTestFileName(char *testFileName, IOR_param_t * test)
749 {
750  char **fileNames;
751  char initialTestFileName[MAX_PATHLEN];
752  char testFileNameRoot[MAX_STR];
753  char tmpString[MAX_STR];
754  int count;
755 
756  /* parse filename for multiple file systems */
757  strcpy(initialTestFileName, test->testFileName);
758  fileNames = ParseFileName(initialTestFileName, &count);
759  if (count > 1 && test->uniqueDir == TRUE)
760  ERR("cannot use multiple file names with unique directories");
761  if (test->filePerProc) {
762  strcpy(testFileNameRoot,
763  fileNames[((rank +
764  rankOffset) % test->numTasks) % count]);
765  } else {
766  strcpy(testFileNameRoot, fileNames[0]);
767  }
768 
769  /* give unique name if using multiple files */
770  if (test->filePerProc) {
771  /*
772  * prepend rank subdirectory before filename
773  * e.g., /dir/file => /dir/<rank>/file
774  */
775  if (test->uniqueDir == TRUE) {
776  strcpy(testFileNameRoot,
777  PrependDir(test, testFileNameRoot));
778  }
779  sprintf(testFileName, "%s.%08d", testFileNameRoot,
780  (rank + rankOffset) % test->numTasks);
781  } else {
782  strcpy(testFileName, testFileNameRoot);
783  }
784 
785  /* add suffix for multiple files */
786  if (test->repCounter > -1) {
787  sprintf(tmpString, ".%d", test->repCounter);
788  strcat(testFileName, tmpString);
789  }
790  free (fileNames);
791 }
792 
793 /*
794  * From absolute directory, insert rank as subdirectory. Allows each task
795  * to write to its own directory. E.g., /dir/file => /dir/<rank>/file.
796  */
797 static char *PrependDir(IOR_param_t * test, char *rootDir)
798 {
799  char *dir;
800  char *fname;
801  int i;
802 
803  dir = (char *)malloc(MAX_STR + 1);
804  if (dir == NULL)
805  ERR("out of memory");
806 
807  /* get dir name */
808  strcpy(dir, rootDir);
809  i = strlen(dir) - 1;
810  while (i > 0) {
811  if (dir[i] == '\0' || dir[i] == '/') {
812  dir[i] = '/';
813  dir[i + 1] = '\0';
814  break;
815  }
816  i--;
817  }
818 
819  /* get file name */
820  fname = rootDir + i + 1;
821 
822  /* create directory with rank as subdirectory */
823  sprintf(dir + i + 1, "%d", (rank + rankOffset) % test->numTasks);
824 
825  /* dir doesn't exist, so create */
826  if (backend->access(dir, F_OK, test) != 0) {
827  if (backend->mkdir(dir, S_IRWXU, test) < 0) {
828  ERRF("cannot create directory: %s", dir);
829  }
830 
831  /* check if correct permissions */
832  } else if (backend->access(dir, R_OK, test) != 0 ||
833  backend->access(dir, W_OK, test) != 0 ||
834  backend->access(dir, X_OK, test) != 0) {
835  ERRF("invalid directory permissions: %s", dir);
836  }
837 
838  /* concatenate dir and file names */
839  strcat(dir, "/");
840  strcat(dir, fname);
841 
842  return dir;
843 }
844 
845 /******************************************************************************/
846 /*
847  * Reduce test results, and show if verbose set.
848  */
849 static void
850 ReduceIterResults(IOR_test_t *test, double *timer, const int rep, const int access)
851 {
852  double reduced[IOR_NB_TIMERS] = { 0 };
853  double diff[IOR_NB_TIMERS / 2 + 1];
854  double totalTime, accessTime;
855  IOR_param_t *params = &test->params;
856  double bw, iops, latency, minlatency;
857  int i;
858  MPI_Op op;
859 
860  assert(access == WRITE || access == READ);
861 
862  /* Find the minimum start time of the even numbered timers, and the
863  maximum finish time for the odd numbered timers */
864  for (i = 0; i < IOR_NB_TIMERS; i++) {
865  op = i % 2 ? MPI_MAX : MPI_MIN;
866  MPI_CHECK(MPI_Reduce(&timer[i], &reduced[i], 1, MPI_DOUBLE,
867  op, 0, testComm), "MPI_Reduce()");
868  }
869 
870  /* Calculate elapsed times and throughput numbers */
871  for (i = 0; i < IOR_NB_TIMERS / 2; i++)
872  diff[i] = reduced[2 * i + 1] - reduced[2 * i];
873 
874  totalTime = reduced[5] - reduced[0];
875  accessTime = reduced[3] - reduced[2];
876 
877  IOR_point_t *point = (access == WRITE) ? &test->results[rep].write :
878  &test->results[rep].read;
879 
880  point->time = totalTime;
881 
882  if (verbose < VERBOSE_0)
883  return;
884 
885  bw = (double)point->aggFileSizeForBW / totalTime;
886 
887  /* For IOPS in this iteration, we divide the total amount of IOs from
888  * all ranks over the entire access time (first start -> last end). */
889  iops = (point->aggFileSizeForBW / params->transferSize) / accessTime;
890 
891  /* For Latency, we divide the total access time for each task over the
892  * number of I/Os issued from that task; then reduce and display the
893  * minimum (best) latency achieved. So what is reported is the average
894  * latency of all ops from a single task, then taking the minimum of
895  * that between all tasks. */
896  latency = (timer[3] - timer[2]) / (params->blockSize / params->transferSize);
897  MPI_CHECK(MPI_Reduce(&latency, &minlatency, 1, MPI_DOUBLE,
898  MPI_MIN, 0, testComm), "MPI_Reduce()");
899 
900  /* Only rank 0 tallies and prints the results. */
901  if (rank != 0)
902  return;
903 
904  PrintReducedResult(test, access, bw, iops, latency, diff, totalTime, rep);
905 }
906 
907 /*
908  * Check for file(s), then remove all files if file-per-proc, else single file.
909  *
910  */
911 static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
912 {
913  int tmpRankOffset = 0;
914  if (filePerProc) {
915  /* in random tasks, delete own file */
916  if (test->reorderTasksRandom == TRUE) {
917  tmpRankOffset = rankOffset;
918  rankOffset = 0;
919  GetTestFileName(testFileName, test);
920  }
921  if (backend->access(testFileName, F_OK, test) == 0) {
922  if (verbose >= VERBOSE_3) {
923  fprintf(out_logfile, "task %d removing %s\n", rank,
924  testFileName);
925  }
926  backend->delete(testFileName, test);
927  }
928  if (test->reorderTasksRandom == TRUE) {
929  rankOffset = tmpRankOffset;
930  GetTestFileName(testFileName, test);
931  }
932  } else {
933  if ((rank == 0) && (backend->access(testFileName, F_OK, test) == 0)) {
934  if (verbose >= VERBOSE_3) {
935  fprintf(out_logfile, "task %d removing %s\n", rank,
936  testFileName);
937  }
938  backend->delete(testFileName, test);
939  }
940  }
941 }
942 
943 /*
944  * Setup tests by parsing commandline and creating test script.
945  * Perform a sanity-check on the configured parameters.
946  */
947 static void InitTests(IOR_test_t *tests, MPI_Comm com)
948 {
949  int mpiNumNodes = 0;
950  int mpiNumTasks = 0;
951  int mpiNumTasksOnNode0 = 0;
952 
953  /*
954  * These default values are the same for every test and expensive to
955  * retrieve so just do it once.
956  */
957  mpiNumNodes = GetNumNodes(com);
958  mpiNumTasks = GetNumTasks(com);
959  mpiNumTasksOnNode0 = GetNumTasksOnNode0(com);
960 
961  /*
962  * Since there is no guarantee that anyone other than
963  * task 0 has the environment settings for the hints, pass
964  * the hint=value pair to everyone else in mpi_comm_world
965  */
966  DistributeHints();
967 
968  /* check validity of tests and create test queue */
969  while (tests != NULL) {
970  IOR_param_t *params = & tests->params;
971  params->testComm = com;
972 
973  /* use MPI values if not overridden on command-line */
974  if (params->numNodes == -1) {
975  params->numNodes = mpiNumNodes;
976  }
977  if (params->numTasks == -1) {
978  params->numTasks = mpiNumTasks;
979  } else if (params->numTasks > mpiNumTasks) {
980  if (rank == 0) {
981  fprintf(out_logfile,
982  "WARNING: More tasks requested (%d) than available (%d),",
983  params->numTasks, mpiNumTasks);
984  fprintf(out_logfile, " running with %d tasks.\n",
985  mpiNumTasks);
986  }
987  params->numTasks = mpiNumTasks;
988  }
989  if (params->numTasksOnNode0 == -1) {
990  params->numTasksOnNode0 = mpiNumTasksOnNode0;
991  }
992 
993  params->tasksBlockMapping = QueryNodeMapping(com,false);
994  params->expectedAggFileSize =
995  params->blockSize * params->segmentCount * params->numTasks;
996 
997  ValidateTests(&tests->params);
998  tests = tests->next;
999  }
1000 
1001  init_clock();
1002 
1003  /* seed random number generator */
1005 }
1006 
1007 /*
1008  * Setup transfer buffers, creating and filling as needed.
1009  */
1010 static void XferBuffersSetup(IOR_io_buffers* ioBuffers, IOR_param_t* test,
1011  int pretendRank)
1012 {
1013  ioBuffers->buffer = aligned_buffer_alloc(test->transferSize);
1014 
1015  if (test->checkWrite || test->checkRead) {
1016  ioBuffers->checkBuffer = aligned_buffer_alloc(test->transferSize);
1017  }
1018  if (test->checkRead || test->checkWrite) {
1020  }
1021 
1022  return;
1023 }
1024 
1025 /*
1026  * Free transfer buffers.
1027  */
1028 static void XferBuffersFree(IOR_io_buffers* ioBuffers, IOR_param_t* test)
1029 
1030 {
1031  aligned_buffer_free(ioBuffers->buffer);
1032 
1033  if (test->checkWrite || test->checkRead) {
1034  aligned_buffer_free(ioBuffers->checkBuffer);
1035  }
1036  if (test->checkRead) {
1038  }
1039 
1040  return;
1041 }
1042 
1043 
1044 
1045 /*
1046  * malloc a buffer, touching every page in an attempt to defeat lazy allocation.
1047  */
1048 static void *malloc_and_touch(size_t size)
1049 {
1050  size_t page_size;
1051  char *buf;
1052  char *ptr;
1053 
1054  if (size == 0)
1055  return NULL;
1056 
1057  page_size = sysconf(_SC_PAGESIZE);
1058 
1059  buf = (char *)malloc(size);
1060  if (buf == NULL)
1061  return NULL;
1062 
1063  for (ptr = buf; ptr < buf+size; ptr += page_size) {
1064  *ptr = (char)1;
1065  }
1066 
1067  return (void *)buf;
1068 }
1069 
1070 static void file_hits_histogram(IOR_param_t *params)
1071 {
1072  int *rankoffs = NULL;
1073  int *filecont = NULL;
1074  int *filehits = NULL;
1075  int ifile;
1076  int jfile;
1077 
1078  if (rank == 0) {
1079  rankoffs = (int *)malloc(params->numTasks * sizeof(int));
1080  filecont = (int *)malloc(params->numTasks * sizeof(int));
1081  filehits = (int *)malloc(params->numTasks * sizeof(int));
1082  }
1083 
1084  MPI_CHECK(MPI_Gather(&rankOffset, 1, MPI_INT, rankoffs,
1085  1, MPI_INT, 0, mpi_comm_world),
1086  "MPI_Gather error");
1087 
1088  if (rank != 0)
1089  return;
1090 
1091  memset((void *)filecont, 0, params->numTasks * sizeof(int));
1092  for (ifile = 0; ifile < params->numTasks; ifile++) {
1093  filecont[(ifile + rankoffs[ifile]) % params->numTasks]++;
1094  }
1095  memset((void *)filehits, 0, params->numTasks * sizeof(int));
1096  for (ifile = 0; ifile < params->numTasks; ifile++)
1097  for (jfile = 0; jfile < params->numTasks; jfile++) {
1098  if (ifile == filecont[jfile])
1099  filehits[ifile]++;
1100  }
1101  fprintf(out_logfile, "#File Hits Dist:");
1102  jfile = 0;
1103  ifile = 0;
1104  while (jfile < params->numTasks && ifile < params->numTasks) {
1105  fprintf(out_logfile, " %d", filehits[ifile]);
1106  jfile += filehits[ifile], ifile++;
1107  }
1108  fprintf(out_logfile, "\n");
1109  free(rankoffs);
1110  free(filecont);
1111  free(filehits);
1112 }
1113 
1114 
1115 int test_time_elapsed(IOR_param_t *params, double startTime)
1116 {
1117  double endTime;
1118 
1119  if (params->maxTimeDuration == 0)
1120  return 0;
1121 
1122  endTime = startTime + (params->maxTimeDuration * 60);
1123 
1124  return GetTimeStamp() >= endTime;
1125 }
1126 
1127 /*
1128  * hog some memory as a rough simulation of a real application's memory use
1129  */
1130 static void *HogMemory(IOR_param_t *params)
1131 {
1132  size_t size;
1133  void *buf;
1134 
1135  if (params->memoryPerTask != 0) {
1136  size = params->memoryPerTask;
1137  } else if (params->memoryPerNode != 0) {
1138  if (verbose >= VERBOSE_3)
1139  fprintf(out_logfile, "This node hogging %ld bytes of memory\n",
1140  params->memoryPerNode);
1141  size = params->memoryPerNode / params->numTasksOnNode0;
1142  } else {
1143  return NULL;
1144  }
1145 
1146  if (verbose >= VERBOSE_3)
1147  fprintf(out_logfile, "This task hogging %ld bytes of memory\n", size);
1148 
1149  buf = malloc_and_touch(size);
1150  if (buf == NULL)
1151  ERR("malloc of simulated applciation buffer failed");
1152 
1153  return buf;
1154 }
1155 /*
1156  * Write times taken during each iteration of the test.
1157  */
1158 static void
1159 WriteTimes(IOR_param_t *test, const double *timer, const int iteration,
1160  const int access)
1161 {
1162  char timerName[MAX_STR];
1163 
1164  for (int i = 0; i < IOR_NB_TIMERS; i++) {
1165 
1166  if (access == WRITE) {
1167  switch (i) {
1168  case 0:
1169  strcpy(timerName, "write open start");
1170  break;
1171  case 1:
1172  strcpy(timerName, "write open stop");
1173  break;
1174  case 2:
1175  strcpy(timerName, "write start");
1176  break;
1177  case 3:
1178  strcpy(timerName, "write stop");
1179  break;
1180  case 4:
1181  strcpy(timerName, "write close start");
1182  break;
1183  case 5:
1184  strcpy(timerName, "write close stop");
1185  break;
1186  default:
1187  strcpy(timerName, "invalid timer");
1188  break;
1189  }
1190  }
1191  else {
1192  switch (i) {
1193  case 0:
1194  strcpy(timerName, "read open start");
1195  break;
1196  case 1:
1197  strcpy(timerName, "read open stop");
1198  break;
1199  case 2:
1200  strcpy(timerName, "read start");
1201  break;
1202  case 3:
1203  strcpy(timerName, "read stop");
1204  break;
1205  case 4:
1206  strcpy(timerName, "read close start");
1207  break;
1208  case 5:
1209  strcpy(timerName, "read close stop");
1210  break;
1211  default:
1212  strcpy(timerName, "invalid timer");
1213  break;
1214  }
1215  }
1216  fprintf(out_logfile, "Test %d: Iter=%d, Task=%d, Time=%f, %s\n",
1217  test->id, iteration, (int)rank, timer[i],
1218  timerName);
1219  }
1220 }
1221 /*
1222  * Using the test parameters, run iteration(s) of single test.
1223  */
1224 static void TestIoSys(IOR_test_t *test)
1225 {
1226  IOR_param_t *params = &test->params;
1227  IOR_results_t *results = test->results;
1228  char testFileName[MAX_STR];
1229  double timer[IOR_NB_TIMERS];
1230  double startTime;
1231  int pretendRank;
1232  int rep;
1233  void *fd;
1234  MPI_Group orig_group, new_group;
1235  int range[3];
1236  IOR_offset_t dataMoved; /* for data rate calculation */
1237  void *hog_buf;
1238  IOR_io_buffers ioBuffers;
1239 
1240  /* set up communicator for test */
1241  MPI_CHECK(MPI_Comm_group(mpi_comm_world, &orig_group),
1242  "MPI_Comm_group() error");
1243  range[0] = 0; /* first rank */
1244  range[1] = params->numTasks - 1; /* last rank */
1245  range[2] = 1; /* stride */
1246  MPI_CHECK(MPI_Group_range_incl(orig_group, 1, &range, &new_group),
1247  "MPI_Group_range_incl() error");
1248  MPI_CHECK(MPI_Comm_create(mpi_comm_world, new_group, &testComm),
1249  "MPI_Comm_create() error");
1250  MPI_CHECK(MPI_Group_free(&orig_group), "MPI_Group_Free() error");
1251  MPI_CHECK(MPI_Group_free(&new_group), "MPI_Group_Free() error");
1252  params->testComm = testComm;
1253  if (testComm == MPI_COMM_NULL) {
1254  /* tasks not in the group do not participate in this test */
1255  MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
1256  return;
1257  }
1258  if (rank == 0 && verbose >= VERBOSE_1) {
1259  fprintf(out_logfile, "Participating tasks: %d\n", params->numTasks);
1260  fflush(out_logfile);
1261  }
1262  if (rank == 0 && params->reorderTasks == TRUE && verbose >= VERBOSE_1) {
1263  fprintf(out_logfile,
1264  "Using reorderTasks '-C' (useful to avoid read cache in client)\n");
1265  fflush(out_logfile);
1266  }
1267  /* show test setup */
1268  if (rank == 0 && verbose >= VERBOSE_0)
1269  ShowSetup(params);
1270 
1271  hog_buf = HogMemory(params);
1272 
1273  pretendRank = (rank + rankOffset) % params->numTasks;
1274 
1275  /* IO Buffer Setup */
1276 
1277  if (params->setTimeStampSignature) { // initialize the buffer properly
1278  params->timeStampSignatureValue = (unsigned int) params->setTimeStampSignature;
1279  }
1280  XferBuffersSetup(&ioBuffers, params, pretendRank);
1281  reseed_incompressible_prng = TRUE; // reset pseudo random generator, necessary to guarantee the next call to FillBuffer produces the same value as it is right now
1282 
1283  /* Initial time stamp */
1284  startTime = GetTimeStamp();
1285 
1286  /* loop over test iterations */
1287  uint64_t params_saved_wearout = params->stoneWallingWearOutIterations;
1288  for (rep = 0; rep < params->repetitions; rep++) {
1289  PrintRepeatStart();
1290  /* Get iteration start time in seconds in task 0 and broadcast to
1291  all tasks */
1292  if (rank == 0) {
1293  if (! params->setTimeStampSignature) {
1294  time_t currentTime;
1295  if ((currentTime = time(NULL)) == -1) {
1296  ERR("cannot get current time");
1297  }
1298  params->timeStampSignatureValue =
1299  (unsigned int) currentTime;
1300  if (verbose >= VERBOSE_2) {
1301  fprintf(out_logfile,
1302  "Using Time Stamp %u (0x%x) for Data Signature\n",
1303  params->timeStampSignatureValue,
1304  params->timeStampSignatureValue);
1305  }
1306  }
1307  if (rep == 0 && verbose >= VERBOSE_0) {
1308  PrintTableHeader();
1309  }
1310  }
1311  MPI_CHECK(MPI_Bcast
1312  (&params->timeStampSignatureValue, 1, MPI_UNSIGNED, 0,
1313  testComm), "cannot broadcast start time value");
1314 
1315  FillBuffer(ioBuffers.buffer, params, 0, pretendRank);
1316  /* use repetition count for number of multiple files */
1317  if (params->multiFile)
1318  params->repCounter = rep;
1319 
1320  /*
1321  * write the file(s), getting timing between I/O calls
1322  */
1323 
1324  if (params->writeFile && !test_time_elapsed(params, startTime)) {
1325  GetTestFileName(testFileName, params);
1326  if (verbose >= VERBOSE_3) {
1327  fprintf(out_logfile, "task %d writing %s\n", rank,
1328  testFileName);
1329  }
1330  DelaySecs(params->interTestDelay);
1331  if (params->useExistingTestFile == FALSE) {
1332  RemoveFile(testFileName, params->filePerProc,
1333  params);
1334  }
1335 
1336  params->stoneWallingWearOutIterations = params_saved_wearout;
1337  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1338  params->open = WRITE;
1339  timer[0] = GetTimeStamp();
1340  fd = backend->create(testFileName, params);
1341  timer[1] = GetTimeStamp();
1342  if (params->intraTestBarriers)
1343  MPI_CHECK(MPI_Barrier(testComm),
1344  "barrier error");
1345  if (rank == 0 && verbose >= VERBOSE_1) {
1346  fprintf(out_logfile,
1347  "Commencing write performance test: %s",
1348  CurrentTimeString());
1349  }
1350  timer[2] = GetTimeStamp();
1351  dataMoved = WriteOrRead(params, &results[rep], fd, WRITE, &ioBuffers);
1352  if (params->verbose >= VERBOSE_4) {
1353  fprintf(out_logfile, "* data moved = %llu\n", dataMoved);
1354  fflush(out_logfile);
1355  }
1356  timer[3] = GetTimeStamp();
1357  if (params->intraTestBarriers)
1358  MPI_CHECK(MPI_Barrier(testComm),
1359  "barrier error");
1360  timer[4] = GetTimeStamp();
1361  backend->close(fd, params);
1362 
1363  timer[5] = GetTimeStamp();
1364  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1365 
1366  /* get the size of the file just written */
1367  results[rep].write.aggFileSizeFromStat =
1368  backend->get_file_size(params, testComm, testFileName);
1369 
1370  /* check if stat() of file doesn't equal expected file size,
1371  use actual amount of byte moved */
1372  CheckFileSize(test, dataMoved, rep, WRITE);
1373 
1374  if (verbose >= VERBOSE_3)
1375  WriteTimes(params, timer, rep, WRITE);
1376  ReduceIterResults(test, timer, rep, WRITE);
1377  if (params->outlierThreshold) {
1378  CheckForOutliers(params, timer, WRITE);
1379  }
1380 
1381  /* check if in this round we run write with stonewalling */
1382  if(params->deadlineForStonewalling > 0){
1383  params->stoneWallingWearOutIterations = results[rep].write.pairs_accessed;
1384  }
1385  }
1386 
1387  /*
1388  * perform a check of data, reading back data and comparing
1389  * against what was expected to be written
1390  */
1391  if (params->checkWrite && !test_time_elapsed(params, startTime)) {
1392  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1393  if (rank == 0 && verbose >= VERBOSE_1) {
1394  fprintf(out_logfile,
1395  "Verifying contents of the file(s) just written.\n");
1396  fprintf(out_logfile, "%s\n", CurrentTimeString());
1397  }
1398  if (params->reorderTasks) {
1399  /* move two nodes away from writing node */
1400  int shift = 1; /* assume a by-node (round-robin) mapping of tasks to nodes */
1401  if (params->tasksBlockMapping) {
1402  shift = params->numTasksOnNode0; /* switch to by-slot (contiguous block) mapping */
1403  }
1404  rankOffset = (2 * shift) % params->numTasks;
1405  }
1406 
1407  // update the check buffer
1408  FillBuffer(ioBuffers.readCheckBuffer, params, 0, (rank + rankOffset) % params->numTasks);
1409 
1410  reseed_incompressible_prng = TRUE; /* Re-Seed the PRNG to get same sequence back, if random */
1411 
1412  GetTestFileName(testFileName, params);
1413  params->open = WRITECHECK;
1414  fd = backend->open(testFileName, params);
1415  dataMoved = WriteOrRead(params, &results[rep], fd, WRITECHECK, &ioBuffers);
1416  backend->close(fd, params);
1417  rankOffset = 0;
1418  }
1419  /*
1420  * read the file(s), getting timing between I/O calls
1421  */
1422  if ((params->readFile || params->checkRead ) && !test_time_elapsed(params, startTime)) {
1423  /* check for stonewall */
1424  if(params->stoneWallingStatusFile){
1426  if(params->stoneWallingWearOutIterations == -1 && rank == 0){
1427  fprintf(out_logfile, "WARNING: Could not read back the stonewalling status from the file!\n");
1428  params->stoneWallingWearOutIterations = 0;
1429  }
1430  }
1431  int operation_flag = READ;
1432  if ( params->checkRead ){
1433  // actually read and then compare the buffer
1434  operation_flag = READCHECK;
1435  }
1436  /* Get rankOffset [file offset] for this process to read, based on -C,-Z,-Q,-X options */
1437  /* Constant process offset reading */
1438  if (params->reorderTasks) {
1439  /* move one node away from writing node */
1440  int shift = 1; /* assume a by-node (round-robin) mapping of tasks to nodes */
1441  if (params->tasksBlockMapping) {
1442  shift=params->numTasksOnNode0; /* switch to a by-slot (contiguous block) mapping */
1443  }
1444  rankOffset = (params->taskPerNodeOffset * shift) % params->numTasks;
1445  }
1446  /* random process offset reading */
1447  if (params->reorderTasksRandom) {
1448  /* this should not intefere with randomOffset within a file because GetOffsetArrayRandom */
1449  /* seeds every rand() call */
1450  int nodeoffset;
1451  unsigned int iseed0;
1452  nodeoffset = params->taskPerNodeOffset;
1453  nodeoffset = (nodeoffset < params->numNodes) ? nodeoffset : params->numNodes - 1;
1454  if (params->reorderTasksRandomSeed < 0)
1455  iseed0 = -1 * params->reorderTasksRandomSeed + rep;
1456  else
1457  iseed0 = params->reorderTasksRandomSeed;
1458  srand(rank + iseed0);
1459  {
1460  rankOffset = rand() % params->numTasks;
1461  }
1462  while (rankOffset <
1463  (nodeoffset * params->numTasksOnNode0)) {
1464  rankOffset = rand() % params->numTasks;
1465  }
1466  /* Get more detailed stats if requested by verbose level */
1467  if (verbose >= VERBOSE_2) {
1468  file_hits_histogram(params);
1469  }
1470  }
1471  if(operation_flag == READCHECK){
1472  FillBuffer(ioBuffers.readCheckBuffer, params, 0, (rank + rankOffset) % params->numTasks);
1473  }
1474 
1475  /* Using globally passed rankOffset, following function generates testFileName to read */
1476  GetTestFileName(testFileName, params);
1477 
1478  if (verbose >= VERBOSE_3) {
1479  fprintf(out_logfile, "task %d reading %s\n", rank,
1480  testFileName);
1481  }
1482  DelaySecs(params->interTestDelay);
1483  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1484  params->open = READ;
1485  timer[0] = GetTimeStamp();
1486  fd = backend->open(testFileName, params);
1487  timer[1] = GetTimeStamp();
1488  if (params->intraTestBarriers)
1489  MPI_CHECK(MPI_Barrier(testComm),
1490  "barrier error");
1491  if (rank == 0 && verbose >= VERBOSE_1) {
1492  fprintf(out_logfile,
1493  "Commencing read performance test: %s\n",
1494  CurrentTimeString());
1495  }
1496  timer[2] = GetTimeStamp();
1497  dataMoved = WriteOrRead(params, &results[rep], fd, operation_flag, &ioBuffers);
1498  timer[3] = GetTimeStamp();
1499  if (params->intraTestBarriers)
1500  MPI_CHECK(MPI_Barrier(testComm),
1501  "barrier error");
1502  timer[4] = GetTimeStamp();
1503  backend->close(fd, params);
1504  timer[5] = GetTimeStamp();
1505 
1506  /* get the size of the file just read */
1507  results[rep].read.aggFileSizeFromStat =
1508  backend->get_file_size(params, testComm,
1509  testFileName);
1510 
1511  /* check if stat() of file doesn't equal expected file size,
1512  use actual amount of byte moved */
1513  CheckFileSize(test, dataMoved, rep, READ);
1514 
1515  if (verbose >= VERBOSE_3)
1516  WriteTimes(params, timer, rep, READ);
1517  ReduceIterResults(test, timer, rep, READ);
1518  if (params->outlierThreshold) {
1519  CheckForOutliers(params, timer, READ);
1520  }
1521  }
1522 
1523  if (!params->keepFile
1524  && !(params->errorFound && params->keepFileWithError)) {
1525  double start, finish;
1526  start = GetTimeStamp();
1527  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1528  RemoveFile(testFileName, params->filePerProc, params);
1529  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1530  finish = GetTimeStamp();
1531  PrintRemoveTiming(start, finish, rep);
1532  } else {
1533  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1534  }
1535  params->errorFound = FALSE;
1536  rankOffset = 0;
1537 
1538  PrintRepeatEnd();
1539  }
1540 
1541  MPI_CHECK(MPI_Comm_free(&testComm), "MPI_Comm_free() error");
1542 
1543  if (params->summary_every_test) {
1546  } else {
1547  PrintShortSummary(test);
1548  }
1549 
1550  XferBuffersFree(&ioBuffers, params);
1551 
1552  if (hog_buf != NULL)
1553  free(hog_buf);
1554 
1555  /* Sync with the tasks that did not participate in this test */
1556  MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
1557 
1558 }
1559 
1560 /*
1561  * Determine if valid tests from parameters.
1562  */
1563 static void ValidateTests(IOR_param_t * test)
1564 {
1565  IOR_param_t defaults;
1566  init_IOR_Param_t(&defaults);
1567 
1568  if (test->repetitions <= 0)
1569  WARN_RESET("too few test repetitions",
1570  test, &defaults, repetitions);
1571  if (test->numTasks <= 0)
1572  ERR("too few tasks for testing");
1573  if (test->interTestDelay < 0)
1574  WARN_RESET("inter-test delay must be nonnegative value",
1575  test, &defaults, interTestDelay);
1576  if (test->readFile != TRUE && test->writeFile != TRUE
1577  && test->checkRead != TRUE && test->checkWrite != TRUE)
1578  ERR("test must write, read, or check read/write file");
1579  if(! test->setTimeStampSignature && test->writeFile != TRUE && test->checkRead == TRUE)
1580  ERR("using readCheck only requires to write a timeStampSignature -- use -G");
1581  if (test->segmentCount < 0)
1582  ERR("segment count must be positive value");
1583  if ((test->blockSize % sizeof(IOR_size_t)) != 0)
1584  ERR("block size must be a multiple of access size");
1585  if (test->blockSize < 0)
1586  ERR("block size must be non-negative integer");
1587  if ((test->transferSize % sizeof(IOR_size_t)) != 0)
1588  ERR("transfer size must be a multiple of access size");
1589  if (test->setAlignment < 0)
1590  ERR("alignment must be non-negative integer");
1591  if (test->transferSize < 0)
1592  ERR("transfer size must be non-negative integer");
1593  if (test->transferSize == 0) {
1594  ERR("test will not complete with zero transfer size");
1595  } else {
1596  if ((test->blockSize % test->transferSize) != 0)
1597  ERR("block size must be a multiple of transfer size");
1598  }
1599  if (test->blockSize < test->transferSize)
1600  ERR("block size must not be smaller than transfer size");
1601 
1602  /* specific APIs */
1603  if ((strcasecmp(test->api, "MPIIO") == 0)
1604  && (test->blockSize < sizeof(IOR_size_t)
1605  || test->transferSize < sizeof(IOR_size_t)))
1606  ERR("block/transfer size may not be smaller than IOR_size_t for MPIIO");
1607  if ((strcasecmp(test->api, "HDF5") == 0)
1608  && (test->blockSize < sizeof(IOR_size_t)
1609  || test->transferSize < sizeof(IOR_size_t)))
1610  ERR("block/transfer size may not be smaller than IOR_size_t for HDF5");
1611  if ((strcasecmp(test->api, "NCMPI") == 0)
1612  && (test->blockSize < sizeof(IOR_size_t)
1613  || test->transferSize < sizeof(IOR_size_t)))
1614  ERR("block/transfer size may not be smaller than IOR_size_t for NCMPI");
1615  if ((test->useFileView == TRUE)
1616  && (sizeof(MPI_Aint) < 8) /* used for 64-bit datatypes */
1617  &&((test->numTasks * test->blockSize) >
1618  (2 * (IOR_offset_t) GIBIBYTE)))
1619  ERR("segment size must be < 2GiB");
1620  if ((strcasecmp(test->api, "POSIX") != 0) && test->singleXferAttempt)
1621  WARN_RESET("retry only available in POSIX",
1622  test, &defaults, singleXferAttempt);
1623  if (((strcasecmp(test->api, "POSIX") != 0)
1624  && (strcasecmp(test->api, "MPIIO") != 0)
1625  && (strcasecmp(test->api, "MMAP") != 0)
1626  && (strcasecmp(test->api, "HDFS") != 0)
1627  && (strcasecmp(test->api, "DFS") != 0)
1628  && (strcasecmp(test->api, "Gfarm") != 0)
1629  && (strcasecmp(test->api, "RADOS") != 0)
1630  && (strcasecmp(test->api, "CEPHFS") != 0)) && test->fsync)
1631  WARN_RESET("fsync() not supported in selected backend",
1632  test, &defaults, fsync);
1633  if ((strcasecmp(test->api, "MPIIO") != 0) && test->preallocate)
1634  WARN_RESET("preallocation only available in MPIIO",
1635  test, &defaults, preallocate);
1636  if ((strcasecmp(test->api, "MPIIO") != 0) && test->useFileView)
1637  WARN_RESET("file view only available in MPIIO",
1638  test, &defaults, useFileView);
1639  if ((strcasecmp(test->api, "MPIIO") != 0) && test->useSharedFilePointer)
1640  WARN_RESET("shared file pointer only available in MPIIO",
1641  test, &defaults, useSharedFilePointer);
1642  if ((strcasecmp(test->api, "MPIIO") == 0) && test->useSharedFilePointer)
1643  WARN_RESET("shared file pointer not implemented",
1644  test, &defaults, useSharedFilePointer);
1645  if ((strcasecmp(test->api, "MPIIO") != 0) && test->useStridedDatatype)
1646  WARN_RESET("strided datatype only available in MPIIO",
1647  test, &defaults, useStridedDatatype);
1648  if ((strcasecmp(test->api, "MPIIO") == 0) && test->useStridedDatatype)
1649  WARN_RESET("strided datatype not implemented",
1650  test, &defaults, useStridedDatatype);
1651  if ((strcasecmp(test->api, "MPIIO") == 0)
1652  && test->useStridedDatatype && (test->blockSize < sizeof(IOR_size_t)
1653  || test->transferSize <
1654  sizeof(IOR_size_t)))
1655  ERR("need larger file size for strided datatype in MPIIO");
1656  if ((strcasecmp(test->api, "POSIX") == 0) && test->showHints)
1657  WARN_RESET("hints not available in POSIX",
1658  test, &defaults, showHints);
1659  if ((strcasecmp(test->api, "POSIX") == 0) && test->collective)
1660  WARN_RESET("collective not available in POSIX",
1661  test, &defaults, collective);
1662  if ((strcasecmp(test->api, "MMAP") == 0) && test->fsyncPerWrite
1663  && (test->transferSize & (sysconf(_SC_PAGESIZE) - 1)))
1664  ERR("transfer size must be aligned with PAGESIZE for MMAP with fsyncPerWrite");
1665 
1666  /* parameter consitency */
1667  if (test->reorderTasks == TRUE && test->reorderTasksRandom == TRUE)
1668  ERR("Both Constant and Random task re-ordering specified. Choose one and resubmit");
1669  if (test->randomOffset && test->reorderTasksRandom
1670  && test->filePerProc == FALSE)
1671  ERR("random offset and random reorder tasks specified with single-shared-file. Choose one and resubmit");
1672  if (test->randomOffset && test->reorderTasks
1673  && test->filePerProc == FALSE)
1674  ERR("random offset and constant reorder tasks specified with single-shared-file. Choose one and resubmit");
1675  if (test->randomOffset && test->checkRead)
1676  ERR("random offset not available with read check option (use write check)");
1677  if (test->randomOffset && test->storeFileOffset)
1678  ERR("random offset not available with store file offset option)");
1679 
1680 
1681  if ((strcasecmp(test->api, "MPIIO") == 0) && test->randomOffset
1682  && test->collective)
1683  ERR("random offset not available with collective MPIIO");
1684  if ((strcasecmp(test->api, "MPIIO") == 0) && test->randomOffset
1685  && test->useFileView)
1686  ERR("random offset not available with MPIIO fileviews");
1687  if ((strcasecmp(test->api, "HDF5") == 0) && test->randomOffset)
1688  ERR("random offset not available with HDF5");
1689  if ((strcasecmp(test->api, "NCMPI") == 0) && test->randomOffset)
1690  ERR("random offset not available with NCMPI");
1691  if ((strcasecmp(test->api, "HDF5") != 0) && test->individualDataSets)
1692  WARN_RESET("individual datasets only available in HDF5",
1693  test, &defaults, individualDataSets);
1694  if ((strcasecmp(test->api, "HDF5") == 0) && test->individualDataSets)
1695  WARN_RESET("individual data sets not implemented",
1696  test, &defaults, individualDataSets);
1697  if ((strcasecmp(test->api, "NCMPI") == 0) && test->filePerProc)
1698  ERR("file-per-proc not available in current NCMPI");
1699  if (test->noFill) {
1700  if (strcasecmp(test->api, "HDF5") != 0) {
1701  ERR("'no fill' option only available in HDF5");
1702  } else {
1703  /* check if hdf5 available */
1704 #if defined (H5_VERS_MAJOR) && defined (H5_VERS_MINOR)
1705  /* no-fill option not available until hdf5-1.6.x */
1706 #if (H5_VERS_MAJOR > 0 && H5_VERS_MINOR > 5)
1707  ;
1708 #else
1709  ERRF("'no fill' option not available in %s",
1710  test->apiVersion);
1711 #endif
1712 #else
1713  WARN("unable to determine HDF5 version for 'no fill' usage");
1714 #endif
1715  }
1716  }
1717  if (test->useExistingTestFile && test->lustre_set_striping)
1718  ERR("Lustre stripe options are incompatible with useExistingTestFile");
1719 
1720  /* allow the backend to validate the options */
1721  if(test->backend->check_params){
1722  int check = test->backend->check_params(test);
1723  if (check == 0){
1724  ERR("The backend returned that the test parameters are invalid.");
1725  }
1726  }
1727 }
1728 
1737 {
1738  IOR_offset_t i, j, k = 0;
1739  IOR_offset_t offsets;
1740  IOR_offset_t *offsetArray;
1741 
1742  /* count needed offsets */
1743  offsets = (test->blockSize / test->transferSize) * test->segmentCount;
1744 
1745  /* setup empty array */
1746  offsetArray =
1747  (IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
1748  if (offsetArray == NULL)
1749  ERR("malloc() failed");
1750  offsetArray[offsets] = -1; /* set last offset with -1 */
1751 
1752  /* fill with offsets */
1753  for (i = 0; i < test->segmentCount; i++) {
1754  for (j = 0; j < (test->blockSize / test->transferSize); j++) {
1755  offsetArray[k] = j * test->transferSize;
1756  if (test->filePerProc) {
1757  offsetArray[k] += i * test->blockSize;
1758  } else {
1759  offsetArray[k] +=
1760  (i * test->numTasks * test->blockSize)
1761  + (pretendRank * test->blockSize);
1762  }
1763  k++;
1764  }
1765  }
1766 
1767  return (offsetArray);
1768 }
1769 
1785 IOR_offset_t *GetOffsetArrayRandom(IOR_param_t * test, int pretendRank, int access)
1786 {
1787  int seed;
1788  IOR_offset_t i, value, tmp;
1789  IOR_offset_t offsets = 0;
1790  IOR_offset_t offsetCnt = 0;
1791  IOR_offset_t fileSize;
1792  IOR_offset_t *offsetArray;
1793 
1794  /* set up seed for random() */
1795  if (access == WRITE || access == READ) {
1796  test->randomSeed = seed = rand();
1797  } else {
1798  seed = test->randomSeed;
1799  }
1800  srand(seed);
1801 
1802  fileSize = test->blockSize * test->segmentCount;
1803  if (test->filePerProc == FALSE) {
1804  fileSize *= test->numTasks;
1805  }
1806 
1807  /* count needed offsets (pass 1) */
1808  for (i = 0; i < fileSize; i += test->transferSize) {
1809  if (test->filePerProc == FALSE) {
1810  // this counts which process get how many transferes in
1811  // a shared file
1812  if ((rand() % test->numTasks) == pretendRank) {
1813  offsets++;
1814  }
1815  } else {
1816  offsets++;
1817  }
1818  }
1819 
1820  /* setup empty array */
1821  offsetArray =
1822  (IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
1823  if (offsetArray == NULL)
1824  ERR("malloc() failed");
1825  offsetArray[offsets] = -1; /* set last offset with -1 */
1826 
1827  if (test->filePerProc) {
1828  /* fill array */
1829  for (i = 0; i < offsets; i++) {
1830  offsetArray[i] = i * test->transferSize;
1831  }
1832  } else {
1833  /* fill with offsets (pass 2) */
1834  srand(seed); /* need same seed to get same transfers as counted in the beginning*/
1835  for (i = 0; i < fileSize; i += test->transferSize) {
1836  if ((rand() % test->numTasks) == pretendRank) {
1837  offsetArray[offsetCnt] = i;
1838  offsetCnt++;
1839  }
1840  }
1841  }
1842  /* reorder array */
1843  for (i = 0; i < offsets; i++) {
1844  value = rand() % offsets;
1845  tmp = offsetArray[value];
1846  offsetArray[value] = offsetArray[i];
1847  offsetArray[i] = tmp;
1848  }
1849  SeedRandGen(test->testComm); /* synchronize seeds across tasks */
1850 
1851  return (offsetArray);
1852 }
1853 
1854 static IOR_offset_t WriteOrReadSingle(IOR_offset_t pairCnt, IOR_offset_t *offsetArray, int pretendRank,
1855  IOR_offset_t * transferCount, int * errors, IOR_param_t * test, int * fd, IOR_io_buffers* ioBuffers, int access){
1856  IOR_offset_t amtXferred = 0;
1857  IOR_offset_t transfer;
1858 
1859  void *buffer = ioBuffers->buffer;
1860  void *checkBuffer = ioBuffers->checkBuffer;
1861  void *readCheckBuffer = ioBuffers->readCheckBuffer;
1862 
1863  test->offset = offsetArray[pairCnt];
1864 
1865  transfer = test->transferSize;
1866  if (access == WRITE) {
1867  /* fills each transfer with a unique pattern
1868  * containing the offset into the file */
1869  if (test->storeFileOffset == TRUE) {
1870  FillBuffer(buffer, test, test->offset, pretendRank);
1871  }
1872  amtXferred =
1873  backend->xfer(access, fd, buffer, transfer, test);
1874  if (amtXferred != transfer)
1875  ERR("cannot write to file");
1876  if (test->interIODelay > 0){
1877  struct timespec wait = {test->interIODelay / 1000 / 1000, 1000l * (test->interIODelay % 1000000)};
1878  nanosleep( & wait, NULL);
1879  }
1880  } else if (access == READ) {
1881  amtXferred =
1882  backend->xfer(access, fd, buffer, transfer, test);
1883  if (amtXferred != transfer)
1884  ERR("cannot read from file");
1885  if (test->interIODelay > 0){
1886  struct timespec wait = {test->interIODelay / 1000 / 1000, 1000l * (test->interIODelay % 1000000)};
1887  nanosleep( & wait, NULL);
1888  }
1889  } else if (access == WRITECHECK) {
1890  memset(checkBuffer, 'a', transfer);
1891 
1892  if (test->storeFileOffset == TRUE) {
1893  FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
1894  }
1895 
1896  amtXferred = backend->xfer(access, fd, checkBuffer, transfer, test);
1897  if (amtXferred != transfer)
1898  ERR("cannot read from file write check");
1899  (*transferCount)++;
1900  *errors += CompareBuffers(readCheckBuffer, checkBuffer, transfer,
1901  *transferCount, test,
1902  WRITECHECK);
1903  } else if (access == READCHECK) {
1904  memset(checkBuffer, 'a', transfer);
1905 
1906  amtXferred = backend->xfer(access, fd, checkBuffer, transfer, test);
1907  if (amtXferred != transfer){
1908  ERR("cannot read from file");
1909  }
1910  if (test->storeFileOffset == TRUE) {
1911  FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
1912  }
1913  *errors += CompareBuffers(readCheckBuffer, checkBuffer, transfer, *transferCount, test, READCHECK);
1914  }
1915  return amtXferred;
1916 }
1917 
1918 /*
1919  * Write or Read data to file(s). This loops through the strides, writing
1920  * out the data to each block in transfer sizes, until the remainder left is 0.
1921  */
1923  void *fd, const int access, IOR_io_buffers *ioBuffers)
1924 {
1925  int errors = 0;
1926  IOR_offset_t transferCount = 0;
1927  uint64_t pairCnt = 0;
1928  IOR_offset_t *offsetArray;
1929  int pretendRank;
1930  IOR_offset_t dataMoved = 0; /* for data rate calculation */
1931  double startForStonewall;
1932  int hitStonewall;
1933  IOR_point_t *point = ((access == WRITE) || (access == WRITECHECK)) ?
1934  &results->write : &results->read;
1935 
1936  /* initialize values */
1937  pretendRank = (rank + rankOffset) % test->numTasks;
1938 
1939  if (test->randomOffset) {
1940  offsetArray = GetOffsetArrayRandom(test, pretendRank, access);
1941  } else {
1942  offsetArray = GetOffsetArraySequential(test, pretendRank);
1943  }
1944 
1945  startForStonewall = GetTimeStamp();
1946  hitStonewall = 0;
1947 
1948  /* loop over offsets to access */
1949  while ((offsetArray[pairCnt] != -1) && !hitStonewall ) {
1950  dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
1951  pairCnt++;
1952 
1953  hitStonewall = ((test->deadlineForStonewalling != 0
1954  && (GetTimeStamp() - startForStonewall)
1955  > test->deadlineForStonewalling)) || (test->stoneWallingWearOutIterations != 0 && pairCnt == test->stoneWallingWearOutIterations) ;
1956 
1957  if ( test->collective && test->deadlineForStonewalling ) {
1958  // if collective-mode, you'll get a HANG, if some rank 'accidentally' leave this loop
1959  // it absolutely must be an 'all or none':
1960  MPI_CHECK(MPI_Bcast(&hitStonewall, 1, MPI_INT, 0, MPI_COMM_WORLD), "hitStonewall broadcast failed");
1961  }
1962 
1963  }
1964  if (test->stoneWallingWearOut){
1965  if (verbose >= VERBOSE_1){
1966  fprintf(out_logfile, "%d: stonewalling pairs accessed: %lld\n", rank, (long long) pairCnt);
1967  }
1968  long long data_moved_ll = (long long) dataMoved;
1969  long long pairs_accessed_min = 0;
1970  MPI_CHECK(MPI_Allreduce(& pairCnt, &point->pairs_accessed,
1971  1, MPI_LONG_LONG_INT, MPI_MAX, testComm), "cannot reduce pairs moved");
1972  double stonewall_runtime = GetTimeStamp() - startForStonewall;
1973  point->stonewall_time = stonewall_runtime;
1974  MPI_CHECK(MPI_Reduce(& pairCnt, & pairs_accessed_min,
1975  1, MPI_LONG_LONG_INT, MPI_MIN, 0, testComm), "cannot reduce pairs moved");
1976  MPI_CHECK(MPI_Reduce(& data_moved_ll, &point->stonewall_min_data_accessed,
1977  1, MPI_LONG_LONG_INT, MPI_MIN, 0, testComm), "cannot reduce pairs moved");
1978  MPI_CHECK(MPI_Reduce(& data_moved_ll, &point->stonewall_avg_data_accessed,
1979  1, MPI_LONG_LONG_INT, MPI_SUM, 0, testComm), "cannot reduce pairs moved");
1980 
1981  if(rank == 0){
1982  fprintf(out_logfile, "stonewalling pairs accessed min: %lld max: %zu -- min data: %.1f GiB mean data: %.1f GiB time: %.1fs\n",
1983  pairs_accessed_min, point->pairs_accessed,
1984  point->stonewall_min_data_accessed /1024.0 / 1024 / 1024, point->stonewall_avg_data_accessed / 1024.0 / 1024 / 1024 / test->numTasks , point->stonewall_time);
1985  point->stonewall_min_data_accessed *= test->numTasks;
1986  }
1987  if(pairCnt != point->pairs_accessed){
1988  // some work needs still to be done !
1989  for(; pairCnt < point->pairs_accessed; pairCnt++ ) {
1990  dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
1991  }
1992  }
1993  }else{
1994  point->pairs_accessed = pairCnt;
1995  }
1996 
1997 
1998  totalErrorCount += CountErrors(test, access, errors);
1999 
2000  free(offsetArray);
2001 
2002  if (access == WRITE && test->fsync == TRUE) {
2003  backend->fsync(fd, test); /*fsync after all accesses */
2004  }
2005  return (dataMoved);
2006 }
int reorderTasks
Definition: ior.h:112
int uniqueDir
Definition: ior.h:134
IOR_offset_t setAlignment
Definition: ior.h:173
IOR_offset_t(* get_file_size)(IOR_param_t *, MPI_Comm, char *)
Definition: aiori.h:79
int GetNumTasks(MPI_Comm comm)
Definition: utilities.c:311
int quitOnError
Definition: ior.h:121
int reorderTasksRandomSeed
Definition: ior.h:115
int ior_main(int argc, char **argv)
Definition: ior.c:101
size_t pairs_accessed
Definition: ior.h:216
int showHints
Definition: ior.h:132
long long stonewall_avg_data_accessed
Definition: ior.h:220
char * hdfs_user
Definition: ior.h:176
void(* delete)(char *, IOR_param_t *)
Definition: aiori.h:76
int errors
Definition: ior.h:228
static IOR_offset_t WriteOrRead(IOR_param_t *test, IOR_results_t *results, void *fd, const int access, IOR_io_buffers *ioBuffers)
Definition: ior.c:1922
int multiFile
Definition: ior.h:105
#define ERR(MSG)
Definition: iordef.h:184
IOR_offset_t * GetOffsetArraySequential(IOR_param_t *test, int pretendRank)
Definition: ior.c:1736
static void file_hits_histogram(IOR_param_t *params)
Definition: ior.c:1070
static void DisplayOutliers(int numTasks, double timerVal, char *timeString, int access, int outlierThreshold)
Definition: ior.c:233
void PrintTestEnds()
Definition: ior-output.c:212
unsigned int incompressibleSeed
Definition: ior.h:149
#define VERBOSE_0
Definition: iordef.h:101
char * GetPlatformName()
Definition: ior.c:663
IOR_offset_t aggFileSizeFromStat
Definition: ior.h:222
unsigned int timeStampSignatureValue
Definition: ior.h:146
int filePerProc
Definition: ior.h:111
void PrintRepeatStart()
Definition: ior-output.c:203
static int size
Definition: mdtest.c:91
#define VERBOSE_3
Definition: iordef.h:104
double stonewall_time
Definition: ior.h:218
int noFill
Definition: ior.h:172
static void InitTests(IOR_test_t *, MPI_Comm)
Definition: ior.c:947
int repetitions
Definition: ior.h:103
int64_t ReadStoneWallingIterations(char *const filename)
Definition: utilities.c:795
IOR_offset_t segmentCount
Definition: ior.h:123
int useStridedDatatype
Definition: ior.h:131
static void aligned_buffer_free(void *buf)
Definition: ior.c:503
#define WARN_RESET(MSG, TO_STRUCT_PTR, FROM_STRUCT_PTR, MEMBER)
Definition: iordef.h:134
void * checkBuffer
Definition: ior.h:62
int keepFile
Definition: ior.h:118
void PrintHeader(int argc, char **argv)
Definition: ior-output.c:253
char ** environ
static void XferBuffersFree(IOR_io_buffers *ioBuffers, IOR_param_t *test)
Definition: ior.c:1028
int checkRead
Definition: ior.h:117
void PrintLongSummaryOneTest(IOR_test_t *test)
Definition: ior-output.c:641
int useSharedFilePointer
Definition: ior.h:130
int test_time_elapsed(IOR_param_t *params, double startTime)
Definition: ior.c:1115
int numTasksOnNode0
Definition: ior.h:101
void FreeResults(IOR_test_t *test)
Definition: ior.c:518
static void ValidateTests(IOR_param_t *)
Definition: ior.c:1563
IOR_offset_t transferSize
Definition: ior.h:125
size_t memoryPerNode
Definition: ior.h:152
#define WRITECHECK
Definition: iordef.h:96
int(* check_params)(IOR_param_t *)
Definition: aiori.h:89
IOR_param_t params
Definition: ior.h:235
void PrintLongSummaryHeader()
Definition: ior-output.c:651
#define READCHECK
Definition: iordef.h:98
int storeFileOffset
Definition: ior.h:136
int errorFound
Definition: ior.h:120
IOR_offset_t aggFileSizeFromXfer
Definition: ior.h:223
double sd
Definition: ior-internal.h:37
int QueryNodeMapping(MPI_Comm comm, int print_nodemap)
Definition: utilities.c:230
static IOR_offset_t WriteOrReadSingle(IOR_offset_t pairCnt, IOR_offset_t *offsetArray, int pretendRank, IOR_offset_t *transferCount, int *errors, IOR_param_t *test, int *fd, IOR_io_buffers *ioBuffers, int access)
Definition: ior.c:1854
static int totalErrorCount
Definition: ior.c:48
size_t part_number
Definition: ior.h:184
char * apiVersion
Definition: ior.h:91
static void * HogMemory(IOR_param_t *params)
Definition: ior.c:1130
int summary_every_test
Definition: ior.h:133
static void DestroyTest(IOR_test_t *test)
Definition: ior.c:545
static void ReduceIterResults(IOR_test_t *test, double *timer, const int rep, const int access)
Definition: ior.c:850
int numNodes
Definition: ior.h:100
int setTimeStampSignature
Definition: ior.h:145
int hdfs_replicas
Definition: ior.h:180
unsigned int openFlags
Definition: ior.h:88
int fsyncPerWrite
Definition: ior.h:162
int interTestDelay
Definition: ior.h:106
#define GIBIBYTE
Definition: iordef.h:88
int(* access)(const char *path, int mode, IOR_param_t *param)
Definition: aiori.h:83
int lustre_start_ost
Definition: ior.h:197
#define WRITE
Definition: iordef.h:95
#define EWARN(MSG)
Definition: iordef.h:169
IOR_test_t * ior_run(int argc, char **argv, MPI_Comm world_com, FILE *world_out)
Definition: ior.c:61
int maxTimeDuration
Definition: ior.h:142
char * testFileName
Definition: ior.h:93
void(* close)(void *, IOR_param_t *)
Definition: aiori.h:75
#define VERBOSE_5
Definition: iordef.h:106
char * stoneWallingStatusFile
Definition: ior.h:140
unsigned int mode
Definition: ior.h:87
void ShowTestStart(IOR_param_t *params)
Definition: ior-output.c:320
#define READ
Definition: iordef.h:97
MPI_Comm testComm
Definition: ior.h:166
int taskPerNodeOffset
Definition: ior.h:113
void init_clock()
Definition: utilities.c:775
#define IOR_CREAT
Definition: aiori.h:38
static char ** ParseFileName(char *, int *)
Definition: ior.c:707
void *(* open)(char *, IOR_param_t *)
Definition: aiori.h:72
double sum
Definition: ior-internal.h:38
void aiori_initialize(IOR_test_t *tests)
Definition: aiori.c:262
int fsync
Definition: ior.h:163
double var
Definition: ior-internal.h:36
struct IOR_test_t * next
Definition: ior.h:237
IOR_offset_t * GetOffsetArrayRandom(IOR_param_t *test, int pretendRank, int access)
Definition: ior.c:1785
hdfsFS hdfs_fs
Definition: ior.h:179
#define IOR_IRGRP
Definition: aiori.h:49
double wall_clock_delta
Definition: utilities.c:720
tPort hdfs_name_node_port
Definition: ior.h:178
int outlierThreshold
Definition: ior.h:143
int intraTestBarriers
Definition: ior.h:210
void GetTestFileName(char *testFileName, IOR_param_t *test)
Definition: ior.c:748
MPI_Comm testComm
Definition: utilities.c:60
int reorderTasksRandom
Definition: ior.h:114
void aiori_finalize(IOR_test_t *tests)
Definition: aiori.c:280
int checkWrite
Definition: ior.h:116
IOR_point_t write
Definition: ior.h:229
unsigned int reseed_incompressible_prng
Definition: ior.c:625
void(* fsync)(void *, IOR_param_t *)
Definition: aiori.h:78
void ShowSetup(IOR_param_t *params)
Definition: ior-output.c:413
void SeedRandGen(MPI_Comm testComm)
Definition: utilities.c:678
Definition: ior.h:59
IOR_offset_t aggFileSizeForBW
Definition: ior.h:224
int verbose
Definition: ior.h:144
static void XferBuffersSetup(IOR_io_buffers *ioBuffers, IOR_param_t *test, int pretendRank)
Definition: ior.c:1010
char * CurrentTimeString(void)
Definition: utilities.c:184
void PrintRemoveTiming(double start, double finish, int rep)
Definition: ior-output.c:775
#define MPI_CHECK(MPI_STATUS, MSG)
Definition: iordef.h:224
static void FillBuffer(void *buffer, IOR_param_t *test, unsigned long long offset, int fillrank)
Definition: ior.c:628
double time
Definition: ior.h:215
IOR_point_t read
Definition: ior.h:230
static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t *test)
Definition: ior.c:911
static void CheckForOutliers(IOR_param_t *test, const double *timer, const int access)
Definition: ior.c:276
IOR_offset_t expectedAggFileSize
Definition: ior.h:127
char * platform
Definition: ior.h:92
int GetNumNodes(MPI_Comm comm)
Definition: utilities.c:274
int singleXferAttempt
Definition: ior.h:161
static void DestroyTests(IOR_test_t *tests_head)
Definition: ior.c:551
Definition: ior.h:48
#define IOR_IRUSR
Definition: aiori.h:45
int interIODelay
Definition: ior.h:107
FILE * out_resultfile
Definition: utilities.c:63
double GetTimeStamp(void)
Definition: utilities.c:726
static void WriteTimes(IOR_param_t *test, const double *timer, const int iteration, const int access)
Definition: ior.c:1159
void PrintShortSummary(IOR_test_t *test)
Definition: ior-output.c:696
int stoneWallingWearOut
Definition: ior.h:138
static const ior_aiori_t * backend
Definition: ior.c:49
void PrintRepeatEnd()
Definition: ior-output.c:197
int(* mkdir)(const char *path, mode_t mode, IOR_param_t *param)
Definition: aiori.h:81
long long stonewall_min_data_accessed
Definition: ior.h:219
IOR_test_t * CreateTest(IOR_param_t *init_params, int test_num)
Definition: ior.c:529
#define IOR_IWGRP
Definition: aiori.h:50
char * URI
Definition: ior.h:183
static void TestIoSys(IOR_test_t *)
Definition: ior.c:1224
void * buffer
Definition: ior.h:61
void PrintTableHeader()
Definition: ior-output.c:18
void DistributeHints(void)
Definition: ior.c:564
void PrintLongSummaryAllTests(IOR_test_t *tests_head)
Definition: ior-output.c:670
static size_t CompareBuffers(void *expectedBuffer, void *unknownBuffer, size_t size, IOR_offset_t transferCount, IOR_param_t *test, int access)
Definition: ior.c:341
static char hostname[MAX_PATHLEN]
Definition: mdtest.c:97
void PrintReducedResult(IOR_test_t *test, int access, double bw, double iops, double latency, double *diff_subset, double totalTime, int rep)
Definition: ior-output.c:222
int keepFileWithError
Definition: ior.h:119
int randomSeed
Definition: ior.h:148
#define FALSE
Definition: iordef.h:71
int rankOffset
Definition: utilities.c:58
int useExistingTestFile
Definition: ior.h:135
enum PACKET_TYPE dataPacketType
Definition: ior.h:156
int beegfs_numTargets
Definition: ior.h:206
void init_IOR_Param_t(IOR_param_t *p)
Definition: ior.c:175
int useFileView
Definition: ior.h:129
int readFile
Definition: ior.h:109
void *(* create)(char *, IOR_param_t *)
Definition: aiori.h:70
long long int IOR_size_t
Definition: iordef.h:123
#define WARN(MSG)
Definition: iordef.h:144
void * readCheckBuffer
Definition: ior.h:63
int tasksBlockMapping
Definition: ior.h:102
int hdfs_block_size
Definition: ior.h:181
int randomOffset
Definition: ior.h:150
int numTasks
Definition: ior.h:99
size_t memoryPerTask
Definition: ior.h:151
const char * aiori_default(void)
Definition: aiori.c:349
#define VERBOSE_2
Definition: iordef.h:103
#define IOR_NB_TIMERS
Definition: ior.c:44
int individualDataSets
Definition: ior.h:171
int writeFile
Definition: ior.h:110
static void * aligned_buffer_alloc(size_t size)
Definition: ior.c:473
uint64_t stoneWallingWearOutIterations
Definition: ior.h:139
#define MAX_STR
Definition: iordef.h:108
#define MAX_HINTS
Definition: iordef.h:109
int collective
Definition: ior.h:122
IOR_offset_t offset
Definition: ior.h:126
static int CountErrors(IOR_param_t *test, int access, int errors)
Definition: ior.c:441
#define VERBOSE_4
Definition: iordef.h:105
#define MAX_PATHLEN
Definition: utilities.h:33
double mean
Definition: ior-internal.h:35
static void * malloc_and_touch(size_t size)
Definition: ior.c:1048
int open
Definition: ior.h:108
const struct ior_aiori * backend
Definition: ior.h:85
static void FillIncompressibleBuffer(void *buffer, IOR_param_t *test)
Definition: ior.c:611
static char * PrependDir(IOR_param_t *, char *)
Definition: ior.c:797
#define IOR_RDWR
Definition: aiori.h:36
void DelaySecs(int delay)
Definition: utilities.c:832
#define VERBOSE_1
Definition: iordef.h:102
IOR_results_t * results
Definition: ior.h:236
int verbose
Definition: utilities.c:59
IOR_test_t * ParseCommandLine(int argc, char **argv)
MPI_Comm mpi_comm_world
Definition: utilities.c:61
int preallocate
Definition: ior.h:128
int deadlineForStonewalling
Definition: ior.h:137
char * api
Definition: ior.h:90
#define FILENAME_DELIMITER
Definition: iordef.h:116
int repCounter
Definition: ior.h:104
static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep, const int access)
Definition: ior.c:298
FILE * out_logfile
Definition: utilities.c:62
#define ERRF(FORMAT,...)
Definition: iordef.h:175
long long int IOR_offset_t
Definition: iordef.h:122
#define IOR_IWUSR
Definition: aiori.h:46
int rank
Definition: utilities.c:57
int numTasks
IOR_offset_t blockSize
Definition: ior.h:124
int GetNumTasksOnNode0(MPI_Comm comm)
Definition: utilities.c:349
#define TRUE
Definition: iordef.h:75
IOR_offset_t(* xfer)(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
Definition: aiori.h:73
int lustre_set_striping
Definition: ior.h:198
void ShowTestEnd(IOR_test_t *tptr)
Definition: ior-output.c:397
const char * hdfs_name_node
Definition: ior.h:177
void * safeMalloc(uint64_t size)
Definition: utilities.c:68
int beegfs_chunkSize
Definition: ior.h:207
#define NULL
Definition: iordef.h:79
int id
Definition: ior.h:209
void AllocResults(IOR_test_t *test)
Definition: ior.c:508