69 # include <sys/ioctl.h> 97 # define lseek64 lseek 145 .enable_mdtest =
true 171 if (init_values !=
NULL){
176 hdfs_user = getenv(
"USER");
180 o->
user = strdup(hdfs_user);
195 memcpy(help, h,
sizeof(h));
203 return hdfsCreateDirectory(o->
fs, path);
209 return hdfsDelete(o->
fs, path, 1);
215 return hdfsExists(o->
fs, path);
222 stat = hdfsGetPathInfo(o->
fs, path);
226 memset(buf, 0,
sizeof(
struct stat));
227 buf->st_atime = stat->mLastAccess;
228 buf->st_size = stat->mSize;
229 buf->st_mtime = stat->mLastMod;
230 buf->st_mode = stat->mPermissions;
232 hdfsFreeFileInfo(stat, 1);
240 stat->
f_bsize = hdfsGetDefaultBlockSize(o->
fs);
241 stat->
f_blocks = hdfsGetCapacity(o->
fs) / hdfsGetDefaultBlockSize(o->
fs);
259 WARN(
"cannot use O_DIRECT");
260 # define O_DIRECT 000000 262 # define O_DIRECT O_DIRECTIO 282 printf(
"-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n",
290 printf(
"<- hdfs_connect [nothing to do]\n");
296 struct hdfsBuilder* builder = hdfsNewBuilder();
298 ERR(
"couldn't create an hdfsBuilder");
301 hdfsBuilderSetForceNewInstance ( builder );
303 hdfsBuilderSetNameNode ( builder, o->
name_node );
305 hdfsBuilderSetUserName ( builder, o->
user );
308 o->
fs = hdfsBuilderConnect( builder );
310 ERR(
"hdsfsBuilderConnect failed");
313 printf(
"<- hdfs_connect [success]\n");
319 printf(
"-> hdfs_disconnect\n");
322 hdfsDisconnect( o->
fs );
326 printf(
"<- hdfs_disconnect\n");
338 printf(
"-> HDFS_Create_Or_Open\n");
342 hdfsFile hdfs_file =
NULL;
343 int fd_oflags = 0, hdfs_return;
358 ERR(
"Opening or creating a file in RDWR is not implemented in HDFS" );
362 fprintf( stdout,
"Opening or creating a file in Exclusive mode is not implemented in HDFS\n" );
366 fprintf( stdout,
"Opening or creating a file for appending is not implemented in HDFS\n" );
373 if ( createFile ==
TRUE ) {
382 fd_oflags |= O_WRONLY;
384 fd_oflags |= O_TRUNC;
385 fd_oflags |= O_WRONLY;
390 fd_oflags |= O_TRUNC;
391 fd_oflags |= O_WRONLY;
395 fd_oflags |= O_RDONLY;
413 if (( flags & IOR_WRONLY ) && ( ! hints->
filePerProc ) && (
rank != 0 )) {
422 printf(
"\thdfsOpenFile(%p, %s, 0%o, %lld, %d, %lld)\n",
432 ERR(
"Failed to open the file" );
439 if (( flags & IOR_WRONLY ) &&
447 printf(
"<- HDFS_Create_Or_Open\n");
449 return ((
void *) hdfs_file );
458 printf(
"-> HDFS_Create\n");
462 printf(
"<- HDFS_Create\n");
472 printf(
"-> HDFS_Open\n");
477 printf(
"<- HDFS_Open( ... TRUE)\n");
483 printf(
"<- HDFS_Open( ... FALSE)\n");
496 printf(
"-> HDFS_Xfer(acc:%d, file:%p, buf:%p, len:%llu, %p)\n",
497 access, file, buffer, length, param);
501 long long remaining = (
long long)length;
502 char* ptr = (
char *)buffer;
505 hdfsFile hdfs_file = (hdfsFile)file;
508 while ( remaining > 0 ) {
511 if (access ==
WRITE) {
513 fprintf( stdout,
"task %d writing to offset %lld\n",
515 offset + length - remaining);
519 printf(
"\thdfsWrite( %p, %p, %p, %lld)\n",
520 hdfs_fs, hdfs_file, ptr, remaining );
522 rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining );
524 ERR(
"hdfsWrite() failed" );
534 fprintf( stdout,
"task %d reading from offset %lld\n",
535 rank, offset + length - remaining );
539 printf(
"\thdfsRead( %p, %p, %p, %lld)\n",
540 hdfs_fs, hdfs_file, ptr, remaining );
542 rc = hdfsPread(hdfs_fs, hdfs_file, offset, ptr, remaining);
544 ERR(
"hdfs_read() returned EOF prematurely" );
548 ERR(
"hdfs_read() failed" );
555 if ( rc < remaining ) {
556 fprintf(stdout,
"WARNING: Task %d, partial %s, %lld of %lld bytes at offset %lld\n",
558 access ==
WRITE ?
"hdfsWrite()" :
"hdfs_read()",
560 offset + length - remaining );
563 MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ),
"barrier error" );
567 ERR(
"too many retries -- aborting" );
572 assert( rc <= remaining );
581 rc = hdfsHFlush(hdfs_fs, hdfs_file);
583 WARN(
"Error during flush");
588 printf(
"<- HDFS_Xfer\n");
599 hdfsFile hdfs_file = (hdfsFile)fd;
602 printf(
"\thdfsFlush(%p, %p)\n", hdfs_fs, hdfs_file);
604 if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) {
606 WARN(
"hdfsFlush() failed" );
616 printf(
"-> HDFS_Close\n");
621 hdfsFile hdfs_file = (hdfsFile)fd;
623 if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) {
624 ERR(
"hdfsCloseFile() failed" );
628 printf(
"<- HDFS_Close\n");
640 printf(
"-> HDFS_Delete\n");
650 ERR(
"Can't delete a file without an HDFS connection" );
652 if ( hdfsDelete( o->
fs, testFileName, 0 ) != 0 ) {
653 sprintf(errmsg,
"[RANK %03d]: hdfsDelete() of file \"%s\" failed\n",
659 printf(
"<- HDFS_Delete\n");
670 char * testFileName) {
672 printf(
"-> HDFS_GetFileSize(%s)\n", testFileName);
684 printf(
"\thdfsGetPathInfo(%s) ...", testFileName);
688 hdfsFileInfo* info = hdfsGetPathInfo( o->
fs, testFileName );
690 ERR(
"hdfsGetPathInfo() failed" );
692 printf(
"done.\n");fflush(stdout);
695 aggFileSizeFromStat = info->mSize;
698 printf(
"<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat);
700 return ( aggFileSizeFromStat );
static void * HDFS_Create_Or_Open(char *testFileName, int flags, aiori_mod_opt_t *param, unsigned char createFile)
static int HDFS_access(const char *path, int mode, aiori_mod_opt_t *options)
static int HDFS_stat(const char *path, struct stat *buf, aiori_mod_opt_t *options)
static aiori_fd_t * HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t *param)
static int HDFS_rmdir(const char *path, aiori_mod_opt_t *options)
static void HDFS_Delete(char *testFileName, aiori_mod_opt_t *param)
static void HDFS_Fsync(aiori_fd_t *, aiori_mod_opt_t *)
static aiori_xfer_hint_t * hints
struct benchmark_options o
static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t *buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t *param)
#define MPI_CHECK(MPI_STATUS, MSG)
char * aiori_get_version()
static int HDFS_statfs(const char *path, ior_aiori_statfs_t *stat, aiori_mod_opt_t *options)
static option_help options[]
static void hdfs_xfer_hints(aiori_xfer_hint_t *params)
static int HDFS_mkdir(const char *path, mode_t mode, aiori_mod_opt_t *options)
void hdfs_set_o_direct_flag(int *fd)
IOR_offset_t transferSize
static option_help * HDFS_options(aiori_mod_opt_t **init_backend_options, aiori_mod_opt_t *init_values)
static void hdfs_connect(hdfs_options_t *o)
static void HDFS_Close(aiori_fd_t *, aiori_mod_opt_t *)
static aiori_fd_t * HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t *param)
static void hdfs_disconnect(hdfs_options_t *o)
static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t *, char *)
long long int IOR_offset_t