104 #include <curl/curl.h> 106 #include <libxml/parser.h> 107 #include <libxml/tree.h> 110 #include "aws4c_extra.h" 116 #define BUFF_SIZE 1024 129 # define IOR_CURL_INIT 0x01 130 # define IOR_CURL_NOCONTINUE 0x02 131 # define IOR_CURL_S3_EMC_EXT 0x04 134 # include <curl/curl.h> 192 .set_version = S3_SetVersion,
209 .set_version = S3_SetVersion,
221 AWS4C_CHECK( aws_init() );
231 #define CURL_ERR(MSG, CURL_ERRNO, PARAM) \ 233 fprintf(stdout, "ior ERROR: %s: %s (curl-errno=%d) (%s:%d)\n", \ 234 MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \ 235 __FILE__, __LINE__); \ 237 MPI_Abort((PARAM)->testComm, -1); \ 241 #define CURL_WARN(MSG, CURL_ERRNO) \ 243 fprintf(stdout, "ior WARNING: %s: %s (curl-errno=%d) (%s:%d)\n", \ 244 MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \ 245 __FILE__, __LINE__); \ 277 printf(
"-> s3_connect\n");
282 printf(
"<- s3_connect [nothing to do]\n");
301 aws_set_debug(param->
verbose >= 4);
302 aws_read_config(getenv(
"USER"));
303 aws_reuse_connections(1);
308 param->io_buf = aws_iobuf_new();
309 aws_iobuf_growth_size(param->io_buf, 1024*1024*1);
311 param->etags = aws_iobuf_new();
312 aws_iobuf_growth_size(param->etags, 1024*1024*8);
353 AWS4C_CHECK( s3_head(param->io_buf,
"") );
354 if ( param->io_buf->code == 404 ) {
355 printf(
" bucket '%s' doesn't exist\n", bucket_name);
357 AWS4C_CHECK( s3_put(param->io_buf,
"") );
358 AWS4C_CHECK_OK( param->io_buf );
359 printf(
"created bucket '%s'\n", bucket_name);
362 AWS4C_CHECK_OK( param->io_buf );
376 printf(
"<- s3_connect [success]\n");
384 printf(
"-> s3_disconnect\n");
390 printf(
"<- s3_disconnect\n");
401 aws_iobuf_reset(param->io_buf);
402 aws_iobuf_reset(param->etags);
440 unsigned char createFile,
441 int multi_part_upload_p ) {
444 printf(
"-> S3_Create_Or_Open('%s', ,%d, %d)\n",
445 testFileName, createFile, multi_part_upload_p);
453 fprintf( stdout,
"Opening in Exclusive mode is not implemented in S3\n" );
456 fprintf( stdout,
"Direct I/O mode is not implemented in S3\n" );
461 int n_to_1 = ! n_to_n;
465 if (! multi_part_upload_p)
469 else if (createFile) {
478 if (multi_part_upload_p) {
484 if ( n_to_n || (
rank == 0) ) {
488 aws_iobuf_reset(param->io_buf);
489 AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
490 AWS4C_CHECK_OK( param->io_buf );
495 IOBuf* response = aws_iobuf_new();
496 AWS4C_CHECK( s3_post2(param->io_buf,
buff,
NULL, response) );
497 AWS4C_CHECK_OK( param->io_buf );
500 aws_iobuf_realloc(response);
501 xmlDocPtr doc = xmlReadMemory(response->first->buf,
502 response->first->len,
505 ERR_SIMPLE(
"Rank0 Failed to find POST response\n");
508 xmlNode* root_element = xmlDocGetRootElement(doc);
509 const char* upload_id = find_element_named(root_element, (
char*)
"UploadId");
511 ERR_SIMPLE(
"couldn't find 'UploadId' in returned XML\n");
514 printf(
"got UploadId = '%s'\n", upload_id);
516 const size_t upload_id_len = strlen(upload_id);
517 if (upload_id_len > MAX_UPLOAD_ID_SIZE) {
519 "UploadId length %d exceeds expected max (%d)",
520 upload_id_len, MAX_UPLOAD_ID_SIZE);
525 memcpy(param->
UploadId, upload_id, upload_id_len);
530 aws_iobuf_free(response);
534 MPI_Bcast(param->
UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->
testComm);
538 MPI_Bcast(param->
UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->
testComm);
548 fprintf( stdout,
"rank %d resetting\n",
552 aws_iobuf_reset(param->io_buf);
553 AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
554 AWS4C_CHECK_OK( param->io_buf );
561 printf(
"<- S3_Create_Or_Open\n");
563 return ((
void *) testFileName );
572 printf(
"-> S3_Create\n");
576 printf(
"<- S3_Create\n");
584 printf(
"-> EMC_Create\n");
588 printf(
"<- EMC_Create\n");
602 printf(
"-> S3_Open\n");
607 printf(
"<- S3_Open( ... TRUE)\n");
613 printf(
"<- S3_Open( ... FALSE)\n");
622 printf(
"-> S3_Open\n");
627 printf(
"<- EMC_Open( ... TRUE)\n");
633 printf(
"<- EMC_Open( ... FALSE)\n");
720 int multi_part_upload_p ) {
723 printf(
"-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n",
724 access, (
char*)file, buffer, length, param);
727 char* fname = (
char*)file;
728 size_t remaining = (size_t)length;
729 char* data_ptr = (
char *)buffer;
734 int n_to_1 = (! n_to_n);
738 if (access ==
WRITE) {
741 fprintf( stdout,
"rank %d writing length=%lld to offset %lld\n",
744 param->
offset + length - remaining);
748 if (multi_part_upload_p) {
794 "%s?partNumber=%d&uploadId=%s",
795 fname, part_number, param->
UploadId);
807 aws_iobuf_reset(param->io_buf);
808 aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
809 AWS4C_CHECK( s3_put(param->io_buf,
buff) );
810 AWS4C_CHECK_OK( param->io_buf );
822 fprintf( stdout,
"rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n",
825 (n_to_1 ?
"N:1" :
"N:N"),
826 (segmented ?
"segmented" :
"strided"),
829 param->io_buf->eTag);
831 if (strlen(param->io_buf->eTag) !=
ETAG_SIZE+2) {
832 fprintf(stderr,
"Rank %d: ERROR: expected ETag to be %d hex digits\n",
841 aws_iobuf_append(param->etags,
842 param->io_buf->eTag +1,
843 strlen(param->io_buf->eTag) -2);
846 printf(
"rank %d: part %d = ETag %s\n",
rank, part_number, param->io_buf->eTag);
850 aws_iobuf_reset(param->io_buf);
858 s3_set_byte_range(-1,-1);
860 s3_set_byte_range(offset, remaining);
866 aws_iobuf_reset(param->io_buf);
867 aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
868 AWS4C_CHECK ( s3_put(param->io_buf, file) );
869 AWS4C_CHECK_OK( param->io_buf );
872 aws_iobuf_reset(param->io_buf);
877 WARN(
"S3 doesn't support 'fsync'" );
884 fprintf( stdout,
"rank %d reading from offset %lld\n",
886 param->
offset + length - remaining );
891 s3_set_byte_range(offset, remaining);
898 aws_iobuf_reset(param->io_buf);
899 aws_iobuf_extend_static(param->io_buf, data_ptr, remaining);
900 AWS4C_CHECK( s3_get(param->io_buf, file) );
901 if (param->io_buf->code != 206) {
903 "Unexpected result (%d, '%s')",
904 param->io_buf->code, param->io_buf->result);
909 aws_iobuf_reset(param->io_buf);
914 printf(
"<- S3_Xfer\n");
979 printf(
"-> S3_Fsync [no-op]\n");
983 printf(
"<- S3_Fsync\n");
1017 int multi_part_upload_p ) {
1019 char* fname = (
char*)fd;
1023 int n_to_1 = (! n_to_n);
1027 printf(
"-> S3_Close('%s', ,%d) %s\n",
1029 multi_part_upload_p,
1030 ((n_to_n) ?
"N:N" : ((segmented) ?
"N:1(seg)" :
"N:1(str)")));
1037 if (multi_part_upload_p) {
1040 size_t etag_data_size = param->etags->write_count;
1041 size_t etags_per_rank = etag_data_size /
ETAG_SIZE;
1050 MPI_Datatype mpi_size_t;
1051 if (
sizeof(
size_t) ==
sizeof(
int))
1052 mpi_size_t = MPI_INT;
1053 else if (
sizeof(
size_t) ==
sizeof(
long))
1054 mpi_size_t = MPI_LONG;
1056 mpi_size_t = MPI_LONG_LONG;
1059 size_t etag_count_max = 0;
1060 MPI_Allreduce(&etags_per_rank, &etag_count_max,
1061 1, mpi_size_t, MPI_MAX, param->
testComm);
1062 if (etags_per_rank != etag_count_max) {
1063 printf(
"Rank %d: etag count mismatch: max:%d, mine:%d\n",
1064 rank, etag_count_max, etags_per_rank);
1069 aws_iobuf_realloc(param->etags);
1070 char* etag_data = param->etags->first->buf;
1078 char* etag_vec = (
char*)malloc((param->
numTasks * etag_data_size) +1);
1080 fprintf(stderr,
"rank 0 failed to malloc %d bytes\n",
1084 MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1085 etag_vec, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
1091 printf(
"rank 0: gathered %d etags from all ranks:\n", etags_per_rank);
1093 for (rnk=0; rnk<param->
numTasks; ++rnk) {
1094 printf(
"\t[%d]: '", rnk);
1097 for (ii=0; ii<etag_data_size; ++ii)
1098 printf(
"%c", etag_ptr[ii]);
1101 etag_ptr += etag_data_size;
1152 size_t start_multiplier;
1157 j_max = etags_per_rank;
1158 start_multiplier = etag_data_size;
1162 i_max = etags_per_rank;
1165 stride = etag_data_size;
1169 xml = aws_iobuf_new();
1170 aws_iobuf_growth_size(xml, 1024 * 8);
1173 aws_iobuf_append_str(xml,
"<CompleteMultipartUpload>\n");
1176 for (i=0; i<i_max; ++i) {
1178 etag_ptr=etag_vec + (i * start_multiplier);
1180 for (j=0; j<j_max; ++j) {
1190 " <PartNumber>%d</PartNumber>\n" 1191 " <ETag>%s</ETag>\n" 1195 aws_iobuf_append_str(xml,
buff);
1203 aws_iobuf_append_str(xml,
"</CompleteMultipartUpload>\n");
1207 MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1208 NULL, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
1214 xml = aws_iobuf_new();
1215 aws_iobuf_growth_size(xml, 1024 * 8);
1218 aws_iobuf_append_str(xml,
"<CompleteMultipartUpload>\n");
1224 for (i=0; i<etags_per_rank; ++i) {
1228 int sz = aws_iobuf_get_raw(param->etags, etag,
ETAG_SIZE);
1231 "Read of ETag %d had length %d (not %d)\n",
1241 " <PartNumber>%d</PartNumber>\n" 1242 " <ETag>%s</ETag>\n" 1246 aws_iobuf_append_str(xml,
buff);
1252 aws_iobuf_append_str(xml,
"</CompleteMultipartUpload>\n");
1258 if (n_to_n || (
rank == 0)) {
1262 debug_iobuf(xml, 1, 1);
1269 AWS4C_CHECK ( s3_post(xml,
buff) );
1270 AWS4C_CHECK_OK( xml );
1272 aws_iobuf_free(xml);
1293 printf(
"rank %d: passed barrier\n",
rank);
1299 aws_reset_connection();
1304 printf(
"<- S3_Close\n");
1337 printf(
"-> S3_Delete(%s)\n", testFileName);
1347 AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1350 aws_iobuf_reset(param->io_buf);
1351 AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1354 AWS4C_CHECK_OK( param->io_buf );
1357 printf(
"<- S3_Delete\n");
1366 printf(
"-> EMC_Delete(%s)\n", testFileName);
1376 AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1379 aws_iobuf_reset(param->io_buf);
1380 AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1383 AWS4C_CHECK_OK( param->io_buf );
1386 printf(
"<- EMC_Delete\n");
1407 char * testFileName) {
1410 printf(
"-> S3_GetFileSize(%s)\n", testFileName);
1421 AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
1422 if ( ! AWS4C_OK(param->io_buf) ) {
1423 fprintf(stderr,
"rank %d: couldn't stat '%s': %s\n",
1424 rank, testFileName, param->io_buf->result);
1427 aggFileSizeFromStat = param->io_buf->contentLen;
1430 printf(
"\trank %d: file-size %llu\n",
rank, aggFileSizeFromStat);
1435 printf(
"\tall-reduce (1)\n");
1437 MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1443 "cannot total data moved" );
1445 aggFileSizeFromStat = tmpSum;
1449 printf(
"\tall-reduce (2a)\n");
1451 MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1457 "cannot total data moved" );
1460 printf(
"\tall-reduce (2b)\n");
1462 MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1468 "cannot total data moved" );
1470 if ( tmpMin != tmpMax ) {
1472 WARN(
"inconsistent file size by different tasks" );
1476 aggFileSizeFromStat = tmpMin;
1481 printf(
"<- S3_GetFileSize [%llu]\n", aggFileSizeFromStat);
1483 return ( aggFileSizeFromStat );
static void S3_Fsync(void *, IOR_param_t *)
static void EMC_Close(void *, IOR_param_t *)
static IOR_offset_t EMC_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
static void * EMC_Create(char *, IOR_param_t *)
static void S3_finalize()
static void S3_Close(void *, IOR_param_t *)
IOR_offset_t segmentCount
static void s3_connect(IOR_param_t *param)
IOR_offset_t transferSize
static IOR_offset_t S3_GetFileSize(IOR_param_t *, MPI_Comm, char *)
static void EMC_Delete(char *testFileName, IOR_param_t *param)
char * aiori_get_version()
static void * S3_Create_Or_Open_internal(char *testFileName, IOR_param_t *param, unsigned char createFile, int multi_part_upload_p)
static IOR_offset_t S3_Xfer_internal(int access, void *file, IOR_size_t *buffer, IOR_offset_t length, IOR_param_t *param, int multi_part_upload_p)
static void * EMC_Open(char *, IOR_param_t *)
static char buff[BUFF_SIZE]
#define MPI_CHECK(MPI_STATUS, MSG)
static void S3_Close_internal(void *fd, IOR_param_t *param, int multi_part_upload_p)
static void * S3_Open(char *, IOR_param_t *)
static void * S3_Create(char *, IOR_param_t *)
static IOR_offset_t S3_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
static void s3_disconnect(IOR_param_t *param)
void s3_MPU_reset(IOR_param_t *param)
static void S3_Delete(char *, IOR_param_t *)
long long int IOR_offset_t
#define IOR_CURL_S3_EMC_EXT
ior_aiori_t s3_plus_aiori