69 # include <sys/ioctl.h> 94 # define lseek64 lseek 140 WARN(
"cannot use O_DIRECT");
141 # define O_DIRECT 000000 143 # define O_DIRECT O_DIRECTIO 163 printf(
"-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n",
171 printf(
"<- hdfs_connect [nothing to do]\n");
177 struct hdfsBuilder* builder = hdfsNewBuilder();
181 hdfsBuilderSetForceNewInstance ( builder );
185 hdfsBuilderSetUserName ( builder, param->
hdfs_user );
188 param->
hdfs_fs = hdfsBuilderConnect( builder );
193 printf(
"<- hdfs_connect [success]\n");
199 printf(
"-> hdfs_disconnect\n");
202 hdfsDisconnect( param->
hdfs_fs );
206 printf(
"<- hdfs_disconnect\n");
218 printf(
"-> HDFS_Create_Or_Open\n");
221 hdfsFile hdfs_file =
NULL;
222 int fd_oflags = 0, hdfs_return;
237 ERR(
"Opening or creating a file in RDWR is not implemented in HDFS" );
241 fprintf( stdout,
"Opening or creating a file in Exclusive mode is not implemented in HDFS\n" );
245 fprintf( stdout,
"Opening or creating a file for appending is not implemented in HDFS\n" );
252 if ( createFile ==
TRUE ) {
261 fd_oflags |= O_WRONLY;
263 fd_oflags |= O_TRUNC;
264 fd_oflags |= O_WRONLY;
269 fd_oflags |= O_TRUNC;
270 fd_oflags |= O_WRONLY;
274 fd_oflags |= O_RDONLY;
304 printf(
"\thdfsOpenFile(0x%llx, %s, 0%o, %d, %d, %d)\n",
312 hdfs_file = hdfsOpenFile( param->
hdfs_fs,
319 ERR(
"Failed to open the file" );
334 printf(
"<- HDFS_Create_Or_Open\n");
336 return ((
void *) hdfs_file );
345 printf(
"-> HDFS_Create\n");
349 printf(
"<- HDFS_Create\n");
359 printf(
"-> HDFS_Open\n");
364 printf(
"<- HDFS_Open( ... TRUE)\n");
370 printf(
"<- HDFS_Open( ... FALSE)\n");
383 printf(
"-> HDFS_Xfer(acc:%d, file:0x%llx, buf:0x%llx, len:%llu, 0x%llx)\n",
384 access, file, buffer, length, param);
388 long long remaining = (
long long)length;
389 char* ptr = (
char *)buffer;
393 hdfsFile hdfs_file = (hdfsFile)file;
396 while ( remaining > 0 ) {
399 if (access ==
WRITE) {
401 fprintf( stdout,
"task %d writing to offset %lld\n",
403 param->
offset + length - remaining);
407 printf(
"\thdfsWrite( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
408 hdfs_fs, hdfs_file, ptr, remaining );
410 rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining );
412 ERR(
"hdfsWrite() failed" );
423 fprintf( stdout,
"task %d reading from offset %lld\n",
425 param->
offset + length - remaining );
429 printf(
"\thdfsRead( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
430 hdfs_fs, hdfs_file, ptr, remaining );
432 rc = hdfsRead( hdfs_fs, hdfs_file, ptr, remaining );
435 ERR(
"hdfs_read() returned EOF prematurely" );
439 ERR(
"hdfs_read() failed" );
446 if ( rc < remaining ) {
447 fprintf(stdout,
"WARNING: Task %d, partial %s, %lld of %lld bytes at offset %lld\n",
449 access ==
WRITE ?
"hdfsWrite()" :
"hdfs_read()",
451 param->
offset + length - remaining );
454 MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ),
"barrier error" );
458 ERR(
"too many retries -- aborting" );
463 assert( rc <= remaining );
470 printf(
"<- HDFS_Xfer\n");
481 printf(
"-> HDFS_Fsync\n");
484 hdfsFile hdfs_file = (hdfsFile)fd;
488 printf(
"\thdfsHSync(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
490 if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) {
491 EWARN(
"hdfsHSync() failed" );
495 printf(
"\thdfsHFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
497 if ( hdfsHFlush( hdfs_fs, hdfs_file ) != 0 ) {
498 EWARN(
"hdfsHFlush() failed" );
502 printf(
"\thdfsFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
504 if ( hdfsFlush( hdfs_fs, hdfs_file ) != 0 ) {
505 EWARN(
"hdfsFlush() failed" );
510 printf(
"<- HDFS_Fsync\n");
520 printf(
"-> HDFS_Close\n");
524 hdfsFile hdfs_file = (hdfsFile)fd;
529 open_flags = O_CREAT | O_WRONLY;
531 open_flags = O_RDONLY;
534 if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) {
535 ERR(
"hdfsCloseFile() failed" );
539 printf(
"<- HDFS_Close\n");
551 printf(
"-> HDFS_Delete\n");
560 ERR_SIMPLE(
"Can't delete a file without an HDFS connection" );
562 if ( hdfsDelete( param->
hdfs_fs, testFileName, 0 ) != 0 ) {
564 "[RANK %03d]: hdfsDelete() of file \"%s\" failed\n",
570 printf(
"<- HDFS_Delete\n");
580 printf(
"-> HDFS_SetVersion\n");
585 printf(
"<- HDFS_SetVersion\n");
598 char * testFileName) {
600 printf(
"-> HDFS_GetFileSize(%s)\n", testFileName);
611 printf(
"\thdfsGetPathInfo(%s) ...", testFileName);fflush(stdout);
614 hdfsFileInfo* info = hdfsGetPathInfo( param->
hdfs_fs, testFileName );
618 printf(
"done.\n");fflush(stdout);
621 aggFileSizeFromStat = info->mSize;
625 printf(
"\tall-reduce (1)\n");
629 &aggFileSizeFromStat, &tmpSum, 1, MPI_LONG_LONG_INT, MPI_SUM, testComm ),
630 "cannot total data moved" );
632 aggFileSizeFromStat = tmpSum;
636 printf(
"\tall-reduce (2a)\n");
640 &aggFileSizeFromStat, &tmpMin, 1, MPI_LONG_LONG_INT, MPI_MIN, testComm ),
641 "cannot total data moved" );
644 printf(
"\tall-reduce (2b)\n");
648 &aggFileSizeFromStat, &tmpMax, 1, MPI_LONG_LONG_INT, MPI_MAX, testComm ),
649 "cannot total data moved" );
651 if ( tmpMin != tmpMax ) {
653 WARN(
"inconsistent file size by different tasks" );
657 aggFileSizeFromStat = tmpMin;
662 printf(
"<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat);
664 return ( aggFileSizeFromStat );
static void HDFS_Delete(char *, IOR_param_t *)
IOR_offset_t transferSize
static IOR_offset_t HDFS_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
tPort hdfs_name_node_port
static void * HDFS_Create_Or_Open(char *testFileName, IOR_param_t *param, unsigned char createFile)
static void HDFS_SetVersion(IOR_param_t *)
#define MPI_CHECK(MPI_STATUS, MSG)
void hdfs_set_o_direct_flag(int *fd)
static void hdfs_connect(IOR_param_t *param)
static void * HDFS_Create(char *, IOR_param_t *)
static void hdfs_disconnect(IOR_param_t *param)
static IOR_offset_t HDFS_GetFileSize(IOR_param_t *, MPI_Comm, char *)
static void * HDFS_Open(char *, IOR_param_t *)
long long int IOR_offset_t
static void HDFS_Fsync(void *, IOR_param_t *)
const char * hdfs_name_node
static void HDFS_Close(void *, IOR_param_t *)