SDDSlib
Loading...
Searching...
No Matches
SDDS_MPI_binary.c
Go to the documentation of this file.
1/**
2 * @file SDDS_MPI_binary.c
3 * @brief Implementation of SDDS MPI Functions
4 *
5 * This source file contains the implementation of functions responsible for reading
6 * SDDS (Self Describing Data Set) datasets in binary format using MPI (Message Passing Interface).
7 * It handles both native and non-native byte orders, ensuring compatibility across different
8 * machine architectures. The functions manage buffer operations, memory allocation, and MPI
9 * communication to facilitate efficient and accurate data retrieval in parallel processing environments.
10 *
11 * @copyright
12 * - (c) 2002 The University of Chicago, as Operator of Argonne National Laboratory.
13 * - (c) 2002 The Regents of the University of California, as Operator of Los Alamos National Laboratory.
14 *
15 * @license
16 * This file is distributed under the terms of the Software License Agreement
17 * found in the file LICENSE included with this distribution.
18 *
19 * @authors
20 * H. Shang
21 * M. Borland
22 * R. Soliday
23 */
24
25#include "mdb.h"
26#include "SDDS.h"
27
28static int32_t defaultStringLength = SDDS_MPI_STRING_COLUMN_LEN;
29static int32_t number_of_string_truncated = 0;
30static int32_t defaultTitleBufferSize = 2400000;
31static int32_t defaultReadBufferSize = 4000000;
32static int32_t defaultWriteBufferSize = 0;
33
34#if MPI_DEBUG
35static FILE *fpdeb = NULL;
36#endif
37
38/**
39 * @brief Set the default read buffer size for SDDS.
40 *
41 * This function updates the default buffer size used for reading SDDS binary data.
42 *
43 * @param newSize The new buffer size to set. Must be greater than zero.
44 * If newSize is less than or equal to zero, the current default
45 * read buffer size is returned without modifying it.
46 * @return The previous default read buffer size.
47 */
48int32_t SDDS_SetDefaultReadBufferSize(int32_t newSize) {
49 int32_t previous;
50 if (newSize <= 0)
51 return defaultReadBufferSize;
52 previous = defaultReadBufferSize;
53 defaultReadBufferSize = newSize;
54 return previous;
55}
56
57/**
58 * @brief Set the default write buffer size for SDDS.
59 *
60 * This function updates the default buffer size used for writing SDDS binary data.
61 *
62 * @param newSize The new buffer size to set. Must be greater than zero.
63 * If newSize is less than or equal to zero, the current default
64 * write buffer size is returned without modifying it.
65 * @return The previous default write buffer size.
66 */
67int32_t SDDS_SetDefaultWriteBufferSize(int32_t newSize) {
68 int32_t previous;
69 if (newSize <= 0)
70 return defaultWriteBufferSize;
71 previous = defaultWriteBufferSize;
72 defaultWriteBufferSize = newSize;
73 return previous;
74}
75
76/**
77 * @brief Set the default title buffer size for SDDS.
78 *
79 * This function updates the default buffer size used for storing SDDS titles.
80 *
81 * @param newSize The new buffer size to set. Must be greater than zero.
82 * If newSize is less than or equal to zero, the current default
83 * title buffer size is returned without modifying it.
84 * @return The previous default title buffer size.
85 */
86int32_t SDDS_SetDefaultTitleBufferSize(int32_t newSize) {
87 int32_t previous;
88 if (newSize <= 0)
89 return defaultTitleBufferSize;
90 previous = defaultTitleBufferSize;
91 defaultTitleBufferSize = newSize;
92 return previous;
93}
94
95/**
96 * @brief Check the number of truncated strings.
97 *
98 * This function returns the number of strings that have been truncated
99 * due to exceeding the default string length.
100 *
101 * @return The number of truncated strings.
102 */
104 return number_of_string_truncated;
105}
106
107/**
108 * @brief Increment the truncated strings counter.
109 *
110 * This function increments the count of strings that have been truncated.
111 */
113 number_of_string_truncated++;
114}
115
116/**
117 * @brief Set the default string length for SDDS.
118 *
119 * This function updates the default maximum length for string columns in SDDS.
120 *
121 * @param newValue The new string length to set. Must be non-negative.
122 * If newValue is negative, the current default string length
123 * is returned without modifying it.
124 * @return The previous default string length.
125 */
126int32_t SDDS_SetDefaultStringLength(int32_t newValue) {
127 int32_t previous;
128 if (newValue < 0)
129 return defaultStringLength;
130 previous = defaultStringLength;
131 defaultStringLength = newValue;
132 return previous;
133}
134
135/**
136 * @brief Write an SDDS binary page using MPI.
137 *
138 * This function writes an SDDS dataset as a binary page using MPI.
139 * If MPI_DEBUG is enabled, it logs debugging information.
140 *
141 * @param SDDS_dataset Pointer to the SDDS_DATASET structure to write.
142 * @return 1 on success, 0 on failure.
143 */
145#if MPI_DEBUG
146 logDebug("SDDS_MPI_WriteBinaryPage", SDDS_dataset);
147#endif
148 /* usleep(1000000); Doesn't solve problem of corrupted data */
149 return SDDS_MPI_WriteContinuousBinaryPage(SDDS_dataset);
150}
151
152/**
153 * @brief Write a binary string to an SDDS dataset using MPI.
154 *
155 * This function writes a string to the SDDS dataset in binary format using MPI.
156 * If the provided string is NULL, a dummy empty string is written instead.
157 *
158 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
159 * @param string The string to write. If NULL, an empty string is written.
160 * @return 1 on success, 0 on failure.
161 */
162int32_t SDDS_MPI_WriteBinaryString(SDDS_DATASET *SDDS_dataset, char *string) {
163 static char *dummy_string = "";
164 int32_t length;
165
166#if MPI_DEBUG
167 logDebug("SDDS_MPI_WriteBinaryString", SDDS_dataset);
168#endif
169
170 if (!string)
171 string = dummy_string;
172
173 length = strlen(string);
174 if (!SDDS_MPI_BufferedWrite(&length, sizeof(length), SDDS_dataset))
175 return 0;
176 if (length && !SDDS_MPI_BufferedWrite(string, sizeof(*string) * length, SDDS_dataset))
177 return 0;
178 return 1;
179}
180
181/**
182 * @brief Write a non-native binary string to an SDDS dataset using MPI.
183 *
184 * This function writes a string to the SDDS dataset in non-native binary format
185 * using MPI, handling byte swapping as necessary. If the provided string is NULL,
186 * a dummy empty string is written instead.
187 *
188 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
189 * @param string The string to write. If NULL, an empty string is written.
190 * @return 1 on success, 0 on failure.
191 */
192int32_t SDDS_MPI_WriteNonNativeBinaryString(SDDS_DATASET *SDDS_dataset, char *string) {
193 static char *dummy_string = "";
194 int32_t length;
195
196#if MPI_DEBUG
197 logDebug("SDDS_MPI_WriteNonNativeBinaryString", SDDS_dataset);
198#endif
199
200 if (!string)
201 string = dummy_string;
202
203 length = strlen(string);
204 SDDS_SwapLong(&length);
205 if (!SDDS_MPI_BufferedWrite(&length, sizeof(length), SDDS_dataset))
206 return 0;
207 SDDS_SwapLong(&length);
208 if (length && !SDDS_MPI_BufferedWrite(string, sizeof(*string) * length, SDDS_dataset))
209 return 0;
210 return 1;
211}
212
213/**
214 * @brief Write binary parameters of an SDDS dataset using MPI.
215 *
216 * This function writes all non-fixed parameters of the SDDS dataset in binary format
217 * using MPI. String parameters are written using SDDS_MPI_WriteBinaryString,
218 * and other types are written directly to the buffer.
219 *
220 * Only the master processor should call this function to write SDDS headers,
221 * parameters, and arrays.
222 *
223 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
224 * @return 1 on success, 0 on failure.
225 */
227 SDDS_LAYOUT *layout;
228 int32_t i;
229
230#if MPI_DEBUG
231 logDebug("SDDS_MPI_WriteBinaryParameters", SDDS_dataset);
232#endif
233
234 /*should only master processors write SDDS hearder,parameters and arrays */
235 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_WriteBinaryParameters"))
236 return (0);
237 layout = &SDDS_dataset->layout;
238 for (i = 0; i < layout->n_parameters; i++) {
239 if (layout->parameter_definition[i].fixed_value)
240 continue;
241 if (layout->parameter_definition[i].type == SDDS_STRING) {
242 if (!SDDS_MPI_WriteBinaryString(SDDS_dataset, *(char **)SDDS_dataset->parameter[i]))
243 return 0;
244 } else {
245 if (!SDDS_MPI_BufferedWrite(SDDS_dataset->parameter[i], SDDS_type_size[layout->parameter_definition[i].type - 1], SDDS_dataset))
246 return 0;
247 }
248 }
249 return (1);
250}
251
252/**
253 * @brief Write non-native binary parameters of an SDDS dataset using MPI.
254 *
255 * This function writes all non-fixed parameters of the SDDS dataset in non-native
256 * binary format using MPI, handling byte swapping as necessary.
257 * String parameters are written using SDDS_MPI_WriteNonNativeBinaryString,
258 * and other types are written directly to the buffer.
259 *
260 * Only the master processor should call this function to write SDDS headers,
261 * parameters, and arrays.
262 *
263 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
264 * @return 1 on success, 0 on failure.
265 */
267 SDDS_LAYOUT *layout;
268 int32_t i;
269
270#if MPI_DEBUG
271 logDebug("SDDS_MPI_WriteNonNativeBinaryParameters", SDDS_dataset);
272#endif
273
274 /*should only master processors write SDDS hearder,parameters and arrays */
275 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_WriteBinaryParameters"))
276 return (0);
277 layout = &SDDS_dataset->layout;
278 SDDS_SwapEndsParameterData(SDDS_dataset);
279 for (i = 0; i < layout->n_parameters; i++) {
280 if (layout->parameter_definition[i].fixed_value)
281 continue;
282 if (layout->parameter_definition[i].type == SDDS_STRING) {
283 if (!SDDS_MPI_WriteNonNativeBinaryString(SDDS_dataset, *(char **)SDDS_dataset->parameter[i])) {
284 SDDS_SwapEndsParameterData(SDDS_dataset);
285 return 0;
286 }
287 } else {
288 if (!SDDS_MPI_BufferedWrite(SDDS_dataset->parameter[i], SDDS_type_size[layout->parameter_definition[i].type - 1], SDDS_dataset)) {
289 SDDS_SwapEndsParameterData(SDDS_dataset);
290 return 0;
291 }
292 }
293 }
294 SDDS_SwapEndsParameterData(SDDS_dataset);
295 return (1);
296}
297
298/**
299 * @brief Write binary arrays of an SDDS dataset using MPI.
300 *
301 * This function writes all arrays of the SDDS dataset in binary format using MPI.
302 * It handles different types of arrays, including strings and fixed-size data types.
303 *
304 * Only the master processor should call this function to write SDDS headers,
305 * parameters, and arrays.
306 *
307 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
308 * @return 1 on success, 0 on failure.
309 */
311 int32_t i, j, zero = 0, writeSize = 0;
312 SDDS_LAYOUT *layout;
313
314 /*only the master processor write SDDS header, parameters and arrays */
315 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_WriteBinaryArray"))
316 return (0);
317 layout = &SDDS_dataset->layout;
318 for (i = 0; i < layout->n_arrays; i++) {
319 if (!SDDS_dataset->array[i].dimension) {
320 for (j = 0; j < layout->array_definition[i].dimensions; j++) {
321 if (!SDDS_MPI_BufferedWrite(&zero, sizeof(zero), SDDS_dataset)) {
322 SDDS_SetError("Unable to write null array--failure writing dimensions (SDDS_MPI_WriteBinaryArrays)");
323 return 0;
324 }
325 }
326 continue;
327 }
328 writeSize = sizeof(*(SDDS_dataset->array)[i].dimension) * layout->array_definition[i].dimensions;
329 if (!SDDS_MPI_BufferedWrite(SDDS_dataset->array[i].dimension, writeSize, SDDS_dataset)) {
330 SDDS_SetError("Unable to write arrays--failure writing dimensions (SDDS_MPI_WriteBinaryArrays)");
331 return (0);
332 }
333 if (layout->array_definition[i].type == SDDS_STRING) {
334 for (j = 0; j < SDDS_dataset->array[i].elements; j++) {
335 if (!SDDS_MPI_WriteBinaryString(SDDS_dataset, ((char **)SDDS_dataset->array[i].data)[j])) {
336 SDDS_SetError("Unable to write arrays--failure writing string (SDDS_WriteBinaryArrays)");
337 return (0);
338 }
339 }
340 } else {
341 writeSize = SDDS_type_size[layout->array_definition[i].type - 1] * SDDS_dataset->array[i].elements;
342 if (!SDDS_MPI_BufferedWrite(SDDS_dataset->array[i].data, writeSize, SDDS_dataset)) {
343 SDDS_SetError("Unable to write arrays--failure writing values (SDDS_MPI_WriteBinaryArrays)");
344 return (0);
345 }
346 }
347 }
348 return (1);
349}
350
351/**
352 * @brief Write non-native binary arrays of an SDDS dataset using MPI.
353 *
354 * This function writes all arrays of the SDDS dataset in non-native binary format
355 * using MPI, handling byte swapping as necessary. It handles different types of
356 * arrays, including strings and fixed-size data types.
357 *
358 * Only the master processor should call this function to write SDDS headers,
359 * parameters, and arrays.
360 *
361 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
362 * @return 1 on success, 0 on failure.
363 */
365 int32_t i, j, zero = 0, writeSize = 0;
366 SDDS_LAYOUT *layout;
367
368 /*only the master processor write SDDS header, parameters and arrays */
369
370 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_WriteBinaryArray"))
371 return (0);
372 SDDS_SwapEndsArrayData(SDDS_dataset);
373 layout = &SDDS_dataset->layout;
374 for (i = 0; i < layout->n_arrays; i++) {
375 if (!SDDS_dataset->array[i].dimension) {
376 for (j = 0; j < layout->array_definition[i].dimensions; j++) {
377 if (!SDDS_MPI_BufferedWrite(&zero, sizeof(zero), SDDS_dataset)) {
378 SDDS_SetError("Unable to write null array--failure writing dimensions (SDDS_MPI_WriteBinaryArrays)");
379 SDDS_SwapEndsArrayData(SDDS_dataset);
380 return 0;
381 }
382 }
383 continue;
384 }
385 writeSize = sizeof(*(SDDS_dataset->array)[i].dimension) * layout->array_definition[i].dimensions;
386 if (!SDDS_MPI_BufferedWrite(SDDS_dataset->array[i].dimension, writeSize, SDDS_dataset)) {
387 SDDS_SetError("Unable to write arrays--failure writing dimensions (SDDS_MPI_WriteBinaryArrays)");
388 SDDS_SwapEndsArrayData(SDDS_dataset);
389 return (0);
390 }
391 if (layout->array_definition[i].type == SDDS_STRING) {
392 for (j = 0; j < SDDS_dataset->array[i].elements; j++) {
393 if (!SDDS_MPI_WriteNonNativeBinaryString(SDDS_dataset, ((char **)SDDS_dataset->array[i].data)[j])) {
394 SDDS_SetError("Unable to write arrays--failure writing string (SDDS_WriteBinaryArrays)");
395 SDDS_SwapEndsArrayData(SDDS_dataset);
396 return (0);
397 }
398 }
399 } else {
400 writeSize = SDDS_type_size[layout->array_definition[i].type - 1] * SDDS_dataset->array[i].elements;
401 if (!SDDS_MPI_BufferedWrite(SDDS_dataset->array[i].data, writeSize, SDDS_dataset)) {
402 SDDS_SetError("Unable to write arrays--failure writing values (SDDS_MPI_WriteBinaryArrays)");
403 SDDS_SwapEndsArrayData(SDDS_dataset);
404 return (0);
405 }
406 }
407 }
408 SDDS_SwapEndsArrayData(SDDS_dataset);
409 return (1);
410}
411
412static long SDDS_MPI_write_kludge_usleep = 0;
413/**
414 * @brief Set the write kludge usleep duration.
415 *
416 * This function sets the duration (in microseconds) for the write kludge
417 * sleep operation. This is used to fix write issues in certain test cases
418 * by introducing a delay after writing a binary row.
419 *
420 * @param value The number of microseconds to sleep after writing a row.
421 */
423 SDDS_MPI_write_kludge_usleep = value;
424}
425
426static short SDDS_MPI_force_file_sync = 0;
427/**
428 * @brief Set the file synchronization flag.
429 *
430 * This function enables or disables forced file synchronization after writing
431 * binary rows. This can help prevent data corruption by ensuring data is flushed
432 * to the file system.
433 *
434 * @param value A short integer where non-zero enables file synchronization,
435 * and zero disables it.
436 */
437void SDDS_MPI_SetFileSync(short value) {
438 SDDS_MPI_force_file_sync = value;
439}
440
441/**
442 * @brief Write a binary row to an SDDS dataset using MPI.
443 *
444 * This function writes a specific row of data from the SDDS dataset in binary
445 * format using MPI. It handles different column types, including strings with
446 * truncation based on the default string length. Optional write kludge delays
447 * and file synchronization can be applied based on configuration.
448 *
449 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
450 * @param row The row index to write.
451 * @return 1 on success, 0 on failure.
452 */
453int32_t SDDS_MPI_WriteBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row) {
454 int32_t size, type;
455 int64_t i;
456 SDDS_LAYOUT *layout;
457 /*char buff[defaultStringLength+1], format[256]; */
458 char *buff;
459 char format[256];
460
461#if MPI_DEBUG
462 logDebug("SDDS_MPI_WriteBinaryRow", SDDS_dataset);
463#endif
464
465 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_WriteBinaryRow"))
466 return (0);
467
468 layout = &SDDS_dataset->layout;
469
470 sprintf(format, "%%-%ds", defaultStringLength);
471 if (!(buff = malloc(sizeof(*buff) * (defaultStringLength + 1)))) {
472 SDDS_SetError("Can not allocate memory in SDDS_MPI_WriteBinaryRow!");
473 return 0;
474 }
475 buff[defaultStringLength] = 0;
476 for (i = 0; i < layout->n_columns; i++) {
477 type = layout->column_definition[i].type;
478 size = SDDS_type_size[type - 1];
479
480 if (type == SDDS_STRING) {
481 if (strlen(*((char **)SDDS_dataset->data[i] + row)) <= defaultStringLength)
482 sprintf(buff, format, *((char **)SDDS_dataset->data[i] + row));
483 else {
484 strncpy(buff, *((char **)SDDS_dataset->data[i] + row), defaultStringLength);
485 number_of_string_truncated++;
486 }
487 if (!SDDS_MPI_WriteBinaryString(SDDS_dataset, buff)) {
488 free(buff);
489 return 0;
490 }
491 } else {
492 size = SDDS_type_size[type - 1];
493 if (!SDDS_MPI_BufferedWrite((char *)SDDS_dataset->data[i] + row * size, size, SDDS_dataset)) {
494 free(buff);
495 return 0;
496 }
497 }
498 }
499 free(buff);
500 if (SDDS_MPI_write_kludge_usleep)
501 usleepSystemIndependent(SDDS_MPI_write_kludge_usleep); /* This fixes write issue in test case. No data corruption observed. */
502 if (SDDS_MPI_force_file_sync)
503 MPI_File_sync(SDDS_dataset->MPI_dataset->MPI_file); /* This also seems to fix it. */
504 return (1);
505}
506
507/**
508 * @brief Write a non-native binary row to an SDDS dataset using MPI.
509 *
510 * This function writes a specific row of data from the SDDS dataset in non-native
511 * binary format using MPI, handling byte swapping as necessary. It manages different
512 * column types, including strings with truncation based on the default string length.
513 * Optional write kludge delays and file synchronization can be applied based on configuration.
514 *
515 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
516 * @param row The row index to write.
517 * @return 1 on success, 0 on failure.
518 */
519int32_t SDDS_MPI_WriteNonNativeBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row) {
520 int32_t size, type;
521 int64_t i;
522 SDDS_LAYOUT *layout;
523 /*char buff[defaultStringLength+1], format[256]; */
524 char *buff;
525 char format[256];
526#if MPI_DEBUG
527 logDebug("SDDS_MPI_WriteNonNativeBinaryRow", SDDS_dataset);
528#endif
529
530 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_WriteBinaryRow"))
531 return (0);
532
533 layout = &SDDS_dataset->layout;
534
535 sprintf(format, "%%-%ds", defaultStringLength);
536 if (!(buff = malloc(sizeof(*buff) * (defaultStringLength + 1)))) {
537 SDDS_SetError("Can not allocate memory in SDDS_MPI_WriteNonNativeBinaryRow!");
538 return 0;
539 }
540 buff[defaultStringLength] = 0;
541 for (i = 0; i < layout->n_columns; i++) {
542 type = layout->column_definition[i].type;
543 size = SDDS_type_size[type - 1];
544
545 if (type == SDDS_STRING) {
546 if (strlen(*((char **)SDDS_dataset->data[i] + row)) <= defaultStringLength)
547 sprintf(buff, format, *((char **)SDDS_dataset->data[i] + row));
548 else {
549 strncpy(buff, *((char **)SDDS_dataset->data[i] + row), defaultStringLength);
550 number_of_string_truncated++;
551 }
552 if (!SDDS_MPI_WriteNonNativeBinaryString(SDDS_dataset, buff)) {
553 free(buff);
554 return 0;
555 }
556 } else {
557 size = SDDS_type_size[type - 1];
558 if (!SDDS_MPI_BufferedWrite((char *)SDDS_dataset->data[i] + row * size, size, SDDS_dataset)) {
559 free(buff);
560 return 0;
561 }
562 }
563 }
564 free(buff);
565 if (SDDS_MPI_write_kludge_usleep)
566 usleepSystemIndependent(SDDS_MPI_write_kludge_usleep); /* This fixes write issue in test case. No data corruption observed. */
567 if (SDDS_MPI_force_file_sync)
568 MPI_File_sync(SDDS_dataset->MPI_dataset->MPI_file); /* This also seems to fix it. */
569 return (1);
570}
571
572/**
573 * @brief Get the total size of all columns in an SDDS dataset.
574 *
575 * This function calculates the total size in bytes required to store all columns
576 * of the SDDS dataset in binary format. For string columns, it includes space
577 * for the string length and the fixed maximum string length.
578 *
579 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
580 * @return The total size of all columns in bytes.
581 */
582MPI_Offset SDDS_MPI_Get_Column_Size(SDDS_DATASET *SDDS_dataset) {
583 int64_t i;
584 MPI_Offset column_offset = 0;
585 SDDS_LAYOUT *layout;
586
587 layout = &SDDS_dataset->layout;
588 for (i = 0; i < layout->n_columns; i++) {
589 if (layout->column_definition[i].type == SDDS_STRING)
590 /* for string type column, the fixed column size defined by user use SDDS_SetDefaultStringLength(value),
591 and the length of string is written before the string */
592 column_offset += sizeof(int32_t) + defaultStringLength * sizeof(char);
593 else
594 column_offset += SDDS_type_size[layout->column_definition[i].type - 1];
595 }
596 return column_offset;
597}
598
599/**
600 * @brief Buffered write to an SDDS dataset using MPI.
601 *
602 * This function writes data to the SDDS dataset using a buffer. If the buffer has
603 * sufficient space, the data is copied into the buffer. Otherwise, the buffer is
604 * flushed to the file, and the new data is either written directly or stored in the buffer.
605 *
606 * @param target Pointer to the data to write.
607 * @param targetSize The size of the data to write, in bytes.
608 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
609 * @return 1 on success, 0 on failure.
610 */
611int32_t SDDS_MPI_BufferedWrite(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset) {
612 SDDS_FILEBUFFER *fBuffer;
613 MPI_DATASET *MPI_dataset;
614 int32_t mpi_code;
615
616#if MPI_DEBUG
617 logDebug("SDDS_MPI_BufferedWrite", SDDS_dataset);
618#endif
619 MPI_dataset = SDDS_dataset->MPI_dataset;
620 fBuffer = &(SDDS_dataset->fBuffer);
621
622 if (!fBuffer->bufferSize) {
623 if ((mpi_code = MPI_File_write(MPI_dataset->MPI_file, target, targetSize, MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
624 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_WriteBufferedWrite(MPI_File_write_at failed)", mpi_code, 0);
625 return 0;
626 }
627 return 1;
628 }
629 if ((fBuffer->bytesLeft -= targetSize) >= 0) {
630 memcpy((char *)fBuffer->data, (char *)target, targetSize);
631 fBuffer->data += targetSize;
632#ifdef DEBUG
633 fprintf(stderr, "SDDS_MPI_BufferedWrite of %" PRId64 " bytes done in-memory, %" PRId64 " bytes left\n", targetSize, fBuffer->bytesLeft);
634#endif
635 return 1;
636 } else {
637 int64_t lastLeft;
638 /* add back what was subtracted in test above.
639 * lastLeft is the number of bytes left in the buffer before doing anything
640 * and also the number of bytes from the users data that get copied into the buffer.
641 */
642 lastLeft = (fBuffer->bytesLeft += targetSize);
643 /* copy part of the data into the buffer and write the buffer out */
644 memcpy((char *)fBuffer->data, (char *)target, (size_t)fBuffer->bytesLeft);
645 if ((mpi_code = MPI_File_write(MPI_dataset->MPI_file, fBuffer->buffer, (int)(fBuffer->bufferSize), MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
646 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_WriteBufferedWrite(MPI_File_write_at failed)", mpi_code, 0);
647 return 0;
648 }
649
650 /* reset the data pointer and the bytesLeft value.
651 * also, determine if the remaining data is too large for the buffer.
652 * if so, just write it out.
653 */
654 fBuffer->data = fBuffer->buffer;
655 if ((targetSize -= lastLeft) > (fBuffer->bytesLeft = fBuffer->bufferSize)) {
656 if ((mpi_code = MPI_File_write_at(MPI_dataset->MPI_file, (MPI_Offset)(MPI_dataset->file_offset), (char *)target + lastLeft, targetSize, MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
657 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_WriteBufferedWrite(MPI_File_write_at failed)", mpi_code, 0);
658 return 0;
659 }
660 return 1;
661 }
662 /* copy remaining data into the buffer.
663 * could do this with a recursive call, but this is more efficient.
664 */
665 memcpy((char *)fBuffer->data, (char *)target + lastLeft, targetSize);
666 fBuffer->data += targetSize;
667 fBuffer->bytesLeft -= targetSize;
668 return 1;
669 }
670}
671
672/**
673 * @brief Buffered write all to an SDDS dataset using MPI.
674 *
675 * This function writes all data to the SDDS dataset using a buffer, ensuring that
676 * all data is written even if the buffer needs to be flushed multiple times.
677 *
678 * @param target Pointer to the data to write.
679 * @param targetSize The size of the data to write, in bytes.
680 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
681 * @return 1 on success, 0 on failure.
682 */
683int32_t SDDS_MPI_BufferedWriteAll(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset) {
684 SDDS_FILEBUFFER *fBuffer;
685 MPI_DATASET *MPI_dataset;
686 int32_t mpi_code;
687
688#if MPI_DEBUG
689 logDebug("SDDS_MPI_BufferedWriteAll", SDDS_dataset);
690#endif
691 MPI_dataset = SDDS_dataset->MPI_dataset;
692 fBuffer = &(SDDS_dataset->fBuffer);
693
694 if (!fBuffer->bufferSize) {
695 if ((mpi_code = MPI_File_write_all(MPI_dataset->MPI_file, target, targetSize, MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
696 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_WriteBufferedWrite(MPI_File_write_at failed)", mpi_code, 0);
697 return 0;
698 }
699 return 1;
700 }
701 if ((fBuffer->bytesLeft -= targetSize) >= 0) {
702 memcpy((char *)fBuffer->data, (char *)target, targetSize);
703 fBuffer->data += targetSize;
704#ifdef DEBUG
705 fprintf(stderr, "SDDS_MPI_BufferedWrite of %" PRId64 " bytes done in-memory, %" PRId64 " bytes left\n", targetSize, fBuffer->bytesLeft);
706#endif
707 return 1;
708 } else {
709 int64_t lastLeft;
710 /* add back what was subtracted in test above.
711 * lastLeft is the number of bytes left in the buffer before doing anything
712 * and also the number of bytes from the users data that get copied into the buffer.
713 */
714 lastLeft = (fBuffer->bytesLeft += targetSize);
715 /* copy part of the data into the buffer and write the buffer out */
716 memcpy((char *)fBuffer->data, (char *)target, (size_t)fBuffer->bytesLeft);
717 if ((mpi_code = MPI_File_write_all(MPI_dataset->MPI_file, fBuffer->buffer, (int)(fBuffer->bufferSize), MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
718 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_WriteBufferedWrite(MPI_File_write_at failed)", mpi_code, 0);
719 return 0;
720 }
721
722 /* reset the data pointer and the bytesLeft value.
723 * also, determine if the remaining data is too large for the buffer.
724 * if so, just write it out.
725 */
726 fBuffer->data = fBuffer->buffer;
727 if ((targetSize -= lastLeft) > (fBuffer->bytesLeft = fBuffer->bufferSize)) {
728 if ((mpi_code = MPI_File_write_all(MPI_dataset->MPI_file, (char *)target + lastLeft, targetSize, MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
729 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_WriteBufferedWrite(MPI_File_write_at failed)", mpi_code, 0);
730 return 0;
731 }
732 return 1;
733 }
734 /* copy remaining data into the buffer.
735 * could do this with a recursive call, but this is more efficient.
736 */
737 memcpy((char *)fBuffer->data, (char *)target + lastLeft, targetSize);
738 fBuffer->data += targetSize;
739 fBuffer->bytesLeft -= targetSize;
740 return 1;
741 }
742}
743
744/**
745 * @brief Flush the buffer by writing any remaining data to the MPI file.
746 *
747 * This function ensures that any data remaining in the buffer is written to the MPI file.
748 * It handles error checking and resets the buffer pointers upon successful write.
749 *
750 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
751 * @return
752 * - `1` on successful flush.
753 * - `0` on failure.
754 */
755int32_t SDDS_MPI_FlushBuffer(SDDS_DATASET *SDDS_dataset) {
756 SDDS_FILEBUFFER *fBuffer;
757 MPI_DATASET *MPI_dataset;
758 int64_t writeBytes;
759 int32_t mpi_code;
760
761#if MPI_DEBUG
762 logDebug("SDDS_MPI_FlushBuffer", SDDS_dataset);
763#endif
764
765 MPI_dataset = SDDS_dataset->MPI_dataset;
766 fBuffer = &(SDDS_dataset->fBuffer);
767
768 if (!fBuffer->bufferSize)
769 return 1;
770
771 if ((writeBytes = fBuffer->bufferSize - fBuffer->bytesLeft)) {
772 if (writeBytes < 0) {
773 SDDS_SetError("Unable to flush buffer: negative byte count (SDDS_FlushBuffer).");
774 return 0;
775 }
776#ifdef DEBUG
777 fprintf(stderr, "Writing %" PRId64 " bytes to disk\n", writeBytes);
778#endif
779 if ((mpi_code = MPI_File_write(MPI_dataset->MPI_file, fBuffer->buffer, writeBytes, MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
780 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_FlushBuffer(MPI_File_write_at failed)", mpi_code, 0);
781 return 0;
782 }
783 fBuffer->bytesLeft = fBuffer->bufferSize;
784 fBuffer->data = fBuffer->buffer;
785 }
786 return 1;
787}
788
789/**
790 * @brief Count the number of rows marked as "of interest" within a specified range.
791 *
792 * This function iterates through the rows of the SDDS dataset from `start_row` to `end_row`
793 * and counts how many rows have their `row_flag` set to indicate they are of interest.
794 *
795 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
796 * @param start_row The starting row index (inclusive) to begin counting.
797 * @param end_row The ending row index (exclusive) to stop counting.
798 * @return The total number of rows marked as of interest within the specified range.
799 */
800int64_t SDDS_MPI_CountRowsOfInterest(SDDS_DATASET *SDDS_dataset, int64_t start_row, int64_t end_row) {
801 int64_t i, rows = 0;
802 for (i = start_row; i < end_row; i++) {
803 if (i > SDDS_dataset->n_rows - 1)
804 break;
805 if (SDDS_dataset->row_flag[i])
806 rows++;
807 }
808 return rows;
809}
810
811/**
812 * @brief Get the total number of rows across all MPI processes.
813 *
814 * This function uses MPI_Reduce to sum the number of rows (`n_rows`) from all processes
815 * and returns the total number of rows to the root process (process 0).
816 *
817 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
818 * @return The total number of rows across all MPI processes. Only valid on the root process.
819 */
820int64_t SDDS_MPI_GetTotalRows(SDDS_DATASET *SDDS_dataset) {
821 int64_t total_rows;
822 MPI_Reduce(&(SDDS_dataset->n_rows), &total_rows, 1, MPI_INT64_T, MPI_SUM, 0, SDDS_dataset->MPI_dataset->comm);
823 return total_rows;
824}
825
826/**
827 * @brief Write a non-native binary page of the SDDS dataset using MPI.
828 *
829 * This function handles the writing of an entire SDDS dataset page in non-native binary format.
830 * It manages buffer allocation, handles byte swapping for non-native formats, and ensures
831 * that all parameters and arrays are correctly written. It also coordinates writing across
832 * multiple MPI processes.
833 *
834 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
835 * @return
836 * - `1` on successful write.
837 * - `0` on failure.
838 */
840 int64_t row, prev_rows, i, total_rows, fixed_rows, rows;
841 int mpi_code, type = 0;
842 MPI_Offset column_offset, rowcount_offset, offset;
843 int64_t *n_rows = NULL;
844 MPI_Status status;
845 int32_t min32 = INT32_MIN;
846
847 MPI_DATASET *MPI_dataset = NULL;
848 SDDS_FILEBUFFER *fBuffer;
849
850#if MPI_DEBUG
851 logDebug("SDDS_MPI_WriteNonNativeBinaryPage", SDDS_dataset);
852#endif
853
854 MPI_dataset = SDDS_dataset->MPI_dataset;
855
856#if MPI_DEBUG
857
858#endif
859
860 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_WriteNonNativeBinaryPage"))
861 return (0);
862
863 fBuffer = &SDDS_dataset->fBuffer;
864 if (SDDS_dataset->layout.data_mode.column_major)
865 /*write by column ignores the row flag and buffer is not needed for writing data by column */
866 rows = SDDS_dataset->n_rows;
867 else {
868 rows = SDDS_CountRowsOfInterest(SDDS_dataset);
869 if (!fBuffer->buffer) {
870 fBuffer->bufferSize = defaultWriteBufferSize;
871 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
872 SDDS_SetError("Unable to do buffered read--allocation failure (SDDS_WriteNonNativeBinaryPage)");
873 return 0;
874 }
875 fBuffer->bytesLeft = fBuffer->bufferSize;
876 fBuffer->data[0] = 0;
877 }
878 }
879 if (MPI_dataset->n_page >= 1)
880 MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL);
881 rowcount_offset = MPI_dataset->file_offset + SDDS_MPI_GetTitleOffset(SDDS_dataset); /* the offset before writing rows */
882 /*get total number of rows writing */
883 column_offset = MPI_dataset->column_offset;
884 if (!(n_rows = calloc(sizeof(*n_rows), MPI_dataset->n_processors))) {
885 SDDS_SetError("Memory allocation failed!");
886 return 0;
887 }
888 MPI_Allgather(&rows, 1, MPI_INT64_T, n_rows, 1, MPI_INT64_T, MPI_dataset->comm);
889 prev_rows = 0;
890 for (i = 0; i < MPI_dataset->myid; i++)
891 prev_rows += n_rows[i];
892 total_rows = 0;
893 for (i = 0; i < MPI_dataset->n_processors; i++)
894 total_rows += n_rows[i];
895 if (MPI_dataset->myid == 0) {
896 fixed_rows = total_rows;
897 if (!fBuffer->buffer) {
898 fBuffer->bufferSize = defaultWriteBufferSize;
899 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
900 SDDS_SetError("Unable to do buffered read--allocation failure (SDDS_WriteNonNativeBinaryPage)");
901 return 0;
902 }
903 fBuffer->bytesLeft = fBuffer->bufferSize;
904 fBuffer->data[0] = 0;
905 }
906 if (fixed_rows > INT32_MAX) {
907 SDDS_SwapLong(&min32);
908 if (!SDDS_MPI_BufferedWrite(&min32, sizeof(min32), SDDS_dataset))
909 return 0;
910 SDDS_SwapLong64(&fixed_rows);
911 if (!SDDS_MPI_BufferedWrite(&fixed_rows, sizeof(fixed_rows), SDDS_dataset))
912 return 0;
913 } else {
914 int32_t fixed_rows32;
915 fixed_rows32 = (int32_t)fixed_rows;
916 SDDS_SwapLong(&fixed_rows32);
917 if (!SDDS_MPI_BufferedWrite(&fixed_rows32, sizeof(fixed_rows32), SDDS_dataset))
918 return 0;
919 }
921 return 0;
922 /* flush buffer, write everything in the buffer to file */
923 if (!SDDS_MPI_FlushBuffer(SDDS_dataset))
924 return 0;
925 }
926 SDDS_SwapEndsColumnData(SDDS_dataset);
927 if (SDDS_dataset->layout.data_mode.column_major) {
928 /*write data by column */
929 offset = rowcount_offset;
930 for (i = 0; i < SDDS_dataset->layout.n_columns; i++) {
931 type = SDDS_dataset->layout.column_definition[i].type;
932 MPI_dataset->file_offset = offset + (MPI_Offset)prev_rows * SDDS_type_size[type - 1];
933 if (type == SDDS_STRING) {
934 SDDS_SetError("Can not write string column to SDDS3 (SDDS_MPI_WriteNonNativeBinaryPage");
935 return 0;
936 }
937 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
938 SDDS_MPI_GOTO_ERROR(stderr, "Unable to set view for read binary rows", mpi_code, 0);
939 SDDS_SetError("Unable to set view for read binary rows");
940 return 0;
941 }
942 if ((mpi_code = MPI_File_write(MPI_dataset->MPI_file, SDDS_dataset->data[i], rows * SDDS_type_size[type - 1], MPI_BYTE, &status)) != MPI_SUCCESS) {
943 SDDS_SetError("Unable to write binary columns (SDDS_MPI_WriteNonNativeBinaryPage");
944 return 0;
945 }
946
947 offset += (MPI_Offset)total_rows * SDDS_type_size[type - 1];
948 }
949 MPI_dataset->file_offset = offset;
950 } else {
951 /* now all processors write column data row by row */
952 MPI_dataset->file_offset = rowcount_offset + (MPI_Offset)prev_rows * column_offset;
953 /* set view to the position where the processor starts writing data */
954 MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL);
955 if (!MPI_dataset->collective_io) {
956 row = 0;
957 for (i = 0; i < SDDS_dataset->n_rows; i++) {
958 if (SDDS_dataset->row_flag[i] && !SDDS_MPI_WriteNonNativeBinaryRow(SDDS_dataset, i))
959 return 0;
960 row++;
961 }
962 /*get current file position until now */
963 SDDS_dataset->n_rows = row;
964 if (!SDDS_MPI_FlushBuffer(SDDS_dataset))
965 return 0;
966 } else {
967 if (!SDDS_MPI_CollectiveWriteByRow(SDDS_dataset))
968 return 0;
969 row = SDDS_dataset->n_rows;
970 }
971 MPI_Allreduce(&row, &total_rows, 1, MPI_INT64_T, MPI_SUM, MPI_dataset->comm);
972 MPI_dataset->file_offset = rowcount_offset + total_rows * column_offset;
973 rows = row;
974 }
975 SDDS_SwapEndsColumnData(SDDS_dataset);
976 free(n_rows);
977 /*skip checking if all data has been written for now */
978 SDDS_dataset->last_row_written = SDDS_dataset->n_rows - 1;
979 SDDS_dataset->n_rows_written = rows;
980 SDDS_dataset->writing_page = 1;
981 MPI_dataset->n_page++;
982 return 1;
983}
984
985/**
986 * @brief Write a continuous binary page of the SDDS dataset using MPI.
987 *
988 * This function writes an SDDS dataset page in binary format, handling both native
989 * and non-native byte orders based on the environment variable `SDDS_OUTPUT_ENDIANESS`.
990 * It manages buffer allocation, byte swapping, and coordinates writing across MPI processes.
991 *
992 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
993 * @return
994 * - `1` on successful write.
995 * - `0` on failure.
996 */
998 int64_t row, prev_rows, i, total_rows, fixed_rows, rows;
999 int mpi_code, type = 0;
1000 MPI_Offset column_offset, rowcount_offset, offset;
1001 int64_t *n_rows = NULL;
1002 int32_t min32 = INT32_MIN;
1003 MPI_Status status;
1004
1005 MPI_DATASET *MPI_dataset = NULL;
1006 SDDS_FILEBUFFER *fBuffer;
1007 char *outputEndianess = NULL;
1008
1009#if MPI_DEBUG
1010 logDebug("SDDS_MPI_WriteContinuousBinaryPage", SDDS_dataset);
1011#endif
1012
1013 /* usleep(10000); This doesn't really help */
1014
1015 if ((outputEndianess = getenv("SDDS_OUTPUT_ENDIANESS"))) {
1016 if (((strncmp(outputEndianess, "big", 3) == 0) && (SDDS_IsBigEndianMachine() == 0)) || ((strncmp(outputEndianess, "little", 6) == 0) && (SDDS_IsBigEndianMachine() == 1)))
1017 return SDDS_MPI_WriteNonNativeBinaryPage(SDDS_dataset);
1018 }
1019
1020 MPI_dataset = SDDS_dataset->MPI_dataset;
1021
1022 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_WriteContinuousBinaryPage"))
1023 return (0);
1024
1025 fBuffer = &SDDS_dataset->fBuffer;
1026 if (SDDS_dataset->layout.data_mode.column_major)
1027 /*write by column ignores the row flag and buffer is not needed for writing data by column */
1028 rows = SDDS_dataset->n_rows;
1029 else {
1030 rows = SDDS_CountRowsOfInterest(SDDS_dataset);
1031 if (!fBuffer->buffer) {
1032 fBuffer->bufferSize = defaultWriteBufferSize;
1033 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
1034 SDDS_SetError("Unable to do buffered read--allocation failure (SDDS_WriteContinuousBinaryPage)");
1035 return 0;
1036 }
1037 fBuffer->bytesLeft = fBuffer->bufferSize;
1038 fBuffer->data[0] = 0;
1039 }
1040 }
1041 if (MPI_dataset->n_page >= 1)
1042 MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL);
1043 rowcount_offset = MPI_dataset->file_offset + SDDS_MPI_GetTitleOffset(SDDS_dataset); /* the offset before writing rows */
1044 /*get total number of rows writing */
1045 column_offset = MPI_dataset->column_offset;
1046 if (!(n_rows = calloc(sizeof(*n_rows), MPI_dataset->n_processors))) {
1047 SDDS_SetError("Memory allocation failed!");
1048 return 0;
1049 }
1050 MPI_Allgather(&rows, 1, MPI_INT64_T, n_rows, 1, MPI_INT64_T, MPI_dataset->comm);
1051 prev_rows = 0;
1052 for (i = 0; i < MPI_dataset->myid; i++)
1053 prev_rows += n_rows[i];
1054 total_rows = 0;
1055 for (i = 0; i < MPI_dataset->n_processors; i++)
1056 total_rows += n_rows[i];
1057 if (MPI_dataset->myid == 0) {
1058 fixed_rows = total_rows;
1059 if (!fBuffer->buffer) {
1060 fBuffer->bufferSize = defaultWriteBufferSize;
1061 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
1062 SDDS_SetError("Unable to do buffered read--allocation failure (SDDS_WriteContinuousBinaryPage)");
1063 return 0;
1064 }
1065 fBuffer->bytesLeft = fBuffer->bufferSize;
1066 fBuffer->data[0] = 0;
1067 }
1068 if (fixed_rows > INT32_MAX) {
1069 if (!SDDS_MPI_BufferedWrite(&min32, sizeof(min32), SDDS_dataset))
1070 return 0;
1071 if (!SDDS_MPI_BufferedWrite(&fixed_rows, sizeof(fixed_rows), SDDS_dataset))
1072 return 0;
1073 } else {
1074 int32_t fixed_rows32;
1075 fixed_rows32 = (int32_t)fixed_rows;
1076 if (!SDDS_MPI_BufferedWrite(&fixed_rows32, sizeof(fixed_rows32), SDDS_dataset))
1077 return 0;
1078 }
1079 if (!SDDS_MPI_WriteBinaryParameters(SDDS_dataset) || !SDDS_MPI_WriteBinaryArrays(SDDS_dataset))
1080 return 0;
1081 /* flush buffer, write everything in the buffer to file */
1082 if (!SDDS_MPI_FlushBuffer(SDDS_dataset))
1083 return 0;
1084 }
1085
1086 if (SDDS_dataset->layout.data_mode.column_major) {
1087 /*write data by column */
1088 offset = rowcount_offset;
1089 for (i = 0; i < SDDS_dataset->layout.n_columns; i++) {
1090 type = SDDS_dataset->layout.column_definition[i].type;
1091 MPI_dataset->file_offset = offset + (MPI_Offset)prev_rows * SDDS_type_size[type - 1];
1092 if (type == SDDS_STRING) {
1093 SDDS_SetError("Can not write string column to SDDS3 (SDDS_MPI_WriteContinuousBinaryPage");
1094 return 0;
1095 }
1096 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
1097 SDDS_MPI_GOTO_ERROR(stderr, "Unable to set view for read binary rows", mpi_code, 0);
1098 SDDS_SetError("Unable to set view for read binary rows");
1099 return 0;
1100 }
1101 if ((mpi_code = MPI_File_write(MPI_dataset->MPI_file, SDDS_dataset->data[i], rows * SDDS_type_size[type - 1], MPI_BYTE, &status)) != MPI_SUCCESS) {
1102 SDDS_SetError("Unable to write binary columns (SDDS_MPI_WriteContinuousBinaryPage");
1103 return 0;
1104 }
1105
1106 offset += (MPI_Offset)total_rows * SDDS_type_size[type - 1];
1107 }
1108 MPI_dataset->file_offset = offset;
1109 } else {
1110 /* now all processors write column data row by row */
1111 MPI_dataset->file_offset = rowcount_offset + (MPI_Offset)prev_rows * column_offset;
1112 /* set view to the position where the processor starts writing data */
1113 MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL);
1114 if (!MPI_dataset->collective_io) {
1115 row = 0;
1116 for (i = 0; i < SDDS_dataset->n_rows; i++) {
1117 if (SDDS_dataset->row_flag[i] && !SDDS_MPI_WriteBinaryRow(SDDS_dataset, i))
1118 return 0;
1119 row++;
1120 }
1121 /*get current file position until now */
1122 SDDS_dataset->n_rows = row;
1123 if (!SDDS_MPI_FlushBuffer(SDDS_dataset))
1124 return 0;
1125 } else {
1126 if (!SDDS_MPI_CollectiveWriteByRow(SDDS_dataset))
1127 return 0;
1128 row = SDDS_dataset->n_rows;
1129 }
1130 MPI_Allreduce(&row, &total_rows, 1, MPI_INT64_T, MPI_SUM, MPI_dataset->comm);
1131 MPI_dataset->file_offset = rowcount_offset + total_rows * column_offset;
1132 rows = row;
1133 }
1134 free(n_rows);
1135 /*skip checking if all data has been written for now */
1136 SDDS_dataset->last_row_written = SDDS_dataset->n_rows - 1;
1137 SDDS_dataset->n_rows_written = rows;
1138 SDDS_dataset->writing_page = 1;
1139 MPI_dataset->n_page++;
1140
1141 return 1;
1142}
1143
1144/**
1145 * @brief Buffered read from an SDDS dataset using MPI.
1146 *
1147 * This function reads data from the SDDS dataset using a buffer. It handles cases
1148 * where sufficient data is already in the buffer and cases where additional data
1149 * needs to be read from the MPI file. It also manages end-of-file conditions.
1150 *
1151 * @param target Pointer to the buffer where the read data will be stored.
1152 * @param targetSize The size of the data to read, in bytes.
1153 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1154 * @param fBuffer Pointer to the SDDS_FILEBUFFER structure managing the buffer.
1155 * @return
1156 * - `1` on successful read.
1157 * - `0` on partial read or failure.
1158 * - `-1` if end-of-file is reached.
1159 */
1160int32_t SDDS_MPI_BufferedRead(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer) {
1161 int32_t mpi_code;
1162 int32_t bytesRead, count;
1163 MPI_Status status;
1164 MPI_DATASET *MPI_dataset = SDDS_dataset->MPI_dataset;
1165
1166 if (!fBuffer || !fBuffer->bufferSize) {
1167 /* just read into users buffer or seek if no buffer given */
1168 if (!target)
1169 mpi_code = MPI_File_seek(MPI_dataset->MPI_file, targetSize, MPI_SEEK_CUR);
1170 else {
1171 mpi_code = MPI_File_read(MPI_dataset->MPI_file, target, targetSize, MPI_BYTE, &status);
1172 MPI_Get_count(&status, MPI_BYTE, &bytesRead);
1173 if (!bytesRead) {
1174 MPI_dataset->end_of_file = 1;
1175 return -1;
1176 }
1177 if (bytesRead < targetSize)
1178 return 0;
1179 }
1180 if (mpi_code != MPI_SUCCESS) {
1181 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_BufferedRead(MPI_File_read failed)", mpi_code, 0);
1182 return 0;
1183 }
1184 return 1;
1185 }
1186 if ((fBuffer->bytesLeft -= targetSize) >= 0) {
1187 /* sufficient data is already in the buffer */
1188 if (target) {
1189 memcpy((char *)target, (char *)fBuffer->data, targetSize);
1190 }
1191 fBuffer->data += targetSize;
1192 return 1;
1193 } else {
1194 /* need to read additional data into buffer */
1195 int64_t bytesNeeded, offset;
1196 fBuffer->bytesLeft += targetSize; /* adds back amount subtracted above */
1197 /* first, use the data that is already available. this cleans out the buffer */
1198 if ((offset = fBuffer->bytesLeft)) {
1199 /* some data is available in the buffer */
1200 if (target) {
1201 memcpy((char *)target, (char *)fBuffer->data, offset);
1202 }
1203 bytesNeeded = targetSize - offset;
1204 fBuffer->bytesLeft = 0;
1205 } else {
1206 bytesNeeded = targetSize;
1207 }
1208 fBuffer->data = fBuffer->buffer;
1209 if (fBuffer->bufferSize < bytesNeeded) {
1210 /* just read what is needed directly into user's memory or seek */
1211 if (!target)
1212 mpi_code = MPI_File_seek(MPI_dataset->MPI_file, targetSize, MPI_SEEK_CUR);
1213 else {
1214 mpi_code = MPI_File_read(MPI_dataset->MPI_file, (char *)target + offset, bytesNeeded, MPI_BYTE, &status);
1215 MPI_Get_count(&status, MPI_BYTE, &bytesRead);
1216 if (!bytesRead) {
1217 MPI_dataset->end_of_file = 1;
1218 return -1;
1219 }
1220 if (bytesRead < bytesNeeded)
1221 return 0;
1222 }
1223 if (mpi_code != MPI_SUCCESS) {
1224 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_ReadBufferedRead(MPI_File_read failed)", mpi_code, 0);
1225 return 0;
1226 }
1227 return 1;
1228 }
1229 /* fill the buffer */
1230 mpi_code = MPI_File_read(MPI_dataset->MPI_file, fBuffer->data, (int)(fBuffer->bufferSize), MPI_BYTE, &status);
1231 MPI_Get_count(&status, MPI_BYTE, &count);
1232 fBuffer->bytesLeft = count;
1233 if (!(fBuffer->bytesLeft))
1234 MPI_dataset->end_of_file = 1;
1235 if (fBuffer->bytesLeft < bytesNeeded)
1236 return 0;
1237 if (target)
1238 memcpy((char *)target + offset, (char *)fBuffer->data, bytesNeeded);
1239 fBuffer->data += bytesNeeded;
1240 fBuffer->bytesLeft -= bytesNeeded;
1241 return 1;
1242 }
1243}
1244
1245/**
1246 * @brief Buffered read all from an SDDS dataset using MPI.
1247 *
1248 * This function reads all requested data from the SDDS dataset using a buffer,
1249 * ensuring that the entire requested data is read unless end-of-file is reached.
1250 * It handles buffer management and MPI collective I/O operations.
1251 *
1252 * @param target Pointer to the buffer where the read data will be stored.
1253 * @param targetSize The size of the data to read, in bytes.
1254 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1255 * @param fBuffer Pointer to the SDDS_FILEBUFFER structure managing the buffer.
1256 * @return
1257 * - `1` on successful read of all requested data.
1258 * - `0` on partial read or failure.
1259 * - `-1` if end-of-file is reached.
1260 */
1261int32_t SDDS_MPI_BufferedReadAll(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer) {
1262 int32_t mpi_code, bytesRead, count;
1263 MPI_Status status;
1264 MPI_DATASET *MPI_dataset = SDDS_dataset->MPI_dataset;
1265
1266 if (!fBuffer || !fBuffer->bufferSize) {
1267 /* just read into users buffer or seek if no buffer given */
1268 if (!target)
1269 mpi_code = MPI_File_seek(MPI_dataset->MPI_file, targetSize, MPI_SEEK_CUR);
1270 else {
1271 mpi_code = MPI_File_read_all(MPI_dataset->MPI_file, target, targetSize, MPI_BYTE, &status);
1272 MPI_Get_count(&status, MPI_BYTE, &bytesRead);
1273 if (!bytesRead) {
1274 MPI_dataset->end_of_file = 1;
1275 return -1;
1276 }
1277 if (bytesRead < targetSize)
1278 return 0;
1279 }
1280 if (mpi_code != MPI_SUCCESS) {
1281 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_BufferedRead(MPI_File_read failed)", mpi_code, 0);
1282 return 0;
1283 }
1284 return 1;
1285 }
1286 if ((fBuffer->bytesLeft -= targetSize) >= 0) {
1287 /* sufficient data is already in the buffer */
1288 if (target) {
1289 memcpy((char *)target, (char *)fBuffer->data, targetSize);
1290 }
1291 fBuffer->data += targetSize;
1292 return 1;
1293 } else {
1294 /* need to read additional data into buffer */
1295 int64_t bytesNeeded, offset;
1296 fBuffer->bytesLeft += targetSize; /* adds back amount subtracted above */
1297 /* first, use the data that is already available. this cleans out the buffer */
1298 if ((offset = fBuffer->bytesLeft)) {
1299 /* some data is available in the buffer */
1300 if (target) {
1301 memcpy((char *)target, (char *)fBuffer->data, offset);
1302 }
1303 bytesNeeded = targetSize - offset;
1304 fBuffer->bytesLeft = 0;
1305 } else {
1306 bytesNeeded = targetSize;
1307 }
1308 fBuffer->data = fBuffer->buffer;
1309 if (fBuffer->bufferSize < bytesNeeded) {
1310 /* just read what is needed directly into user's memory or seek */
1311 if (!target)
1312 mpi_code = MPI_File_seek(MPI_dataset->MPI_file, targetSize, MPI_SEEK_CUR);
1313 else {
1314 mpi_code = MPI_File_read_all(MPI_dataset->MPI_file, (char *)target + offset, bytesNeeded, MPI_BYTE, &status);
1315 MPI_Get_count(&status, MPI_BYTE, &bytesRead);
1316 if (!bytesRead) {
1317 MPI_dataset->end_of_file = 1;
1318 return -1;
1319 }
1320 if (bytesRead < bytesNeeded)
1321 return 0;
1322 }
1323 if (mpi_code != MPI_SUCCESS) {
1324 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_ReadBufferedRead(MPI_File_read failed)", mpi_code, 0);
1325 return 0;
1326 }
1327 return 1;
1328 }
1329 /* fill the buffer */
1330 mpi_code = MPI_File_read_all(MPI_dataset->MPI_file, fBuffer->data, (int)(fBuffer->bufferSize), MPI_BYTE, &status);
1331 MPI_Get_count(&status, MPI_BYTE, &count);
1332 fBuffer->bytesLeft = count;
1333 if (!(fBuffer->bytesLeft))
1334 MPI_dataset->end_of_file = 1;
1335 if (fBuffer->bytesLeft < bytesNeeded)
1336 return 0;
1337 if (target)
1338 memcpy((char *)target + offset, (char *)fBuffer->data, bytesNeeded);
1339 fBuffer->data += bytesNeeded;
1340 fBuffer->bytesLeft -= bytesNeeded;
1341 return 1;
1342 }
1343}
1344
1345/**
1346 * @brief Read a binary string from the SDDS dataset using MPI.
1347 *
1348 * This function reads a string from the SDDS dataset in binary format using MPI.
1349 * It first reads the length of the string, allocates memory for the string,
1350 * and then reads the string data itself. If the `skip` parameter is set,
1351 * the string data is read and discarded.
1352 *
1353 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1354 * @param fBuffer Pointer to the SDDS_FILEBUFFER structure managing the buffer.
1355 * @param skip If non-zero, the string data is read and skipped (not stored).
1356 * @return
1357 * - Pointer to the allocated string on success.
1358 * - `0` if reading the length fails or if the length is negative.
1359 * - `NULL` if memory allocation fails or reading the string data fails.
1360 */
1361char *SDDS_MPI_ReadBinaryString(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer, int32_t skip) {
1362 int32_t length;
1363 char *string;
1364 if (!SDDS_MPI_BufferedRead(&length, sizeof(length), SDDS_dataset, fBuffer) || length < 0)
1365 return (0);
1366 if (!(string = SDDS_Malloc(sizeof(*string) * (length + 1))))
1367 return (NULL);
1368 if (length && !SDDS_MPI_BufferedRead(skip ? NULL : string, sizeof(*string) * length, SDDS_dataset, fBuffer))
1369 return (NULL);
1370 string[length] = 0;
1371 return (string);
1372}
1373
1374/**
1375 * @brief Read a non-native binary string from the SDDS dataset using MPI.
1376 *
1377 * This function reads a string from the SDDS dataset in non-native binary format using MPI.
1378 * It handles byte swapping for the string length to match the non-native byte order.
1379 * After reading the length, it allocates memory for the string and reads the string data.
1380 * If the `skip` parameter is set, the string data is read and discarded.
1381 *
1382 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1383 * @param fBuffer Pointer to the SDDS_FILEBUFFER structure managing the buffer.
1384 * @param skip If non-zero, the string data is read and skipped (not stored).
1385 * @return
1386 * - Pointer to the allocated string on success.
1387 * - `0` if reading the length fails or if the length is negative.
1388 * - `NULL` if memory allocation fails or reading the string data fails.
1389 */
1390char *SDDS_MPI_ReadNonNativeBinaryString(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer, int32_t skip) {
1391 int32_t length;
1392 char *string;
1393
1394 if (!SDDS_MPI_BufferedRead(&length, sizeof(length), SDDS_dataset, fBuffer))
1395 return (0);
1396 SDDS_SwapLong(&length);
1397 if (length < 0)
1398 return (0);
1399 if (!(string = SDDS_Malloc(sizeof(*string) * (length + 1))))
1400 return (NULL);
1401 if (length && !SDDS_MPI_BufferedRead(skip ? NULL : string, sizeof(*string) * length, SDDS_dataset, fBuffer))
1402 return (NULL);
1403 string[length] = 0;
1404 return (string);
1405}
1406
1407/**
1408 * @brief Read binary parameters from the SDDS dataset using MPI.
1409 *
1410 * This function reads all non-fixed parameters from the SDDS dataset in binary format using MPI.
1411 * It iterates through each parameter, handling string parameters by reading them as binary strings,
1412 * and other types by reading their binary representations directly into the dataset's parameter storage.
1413 *
1414 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1415 * @param fBuffer Pointer to the SDDS_FILEBUFFER structure managing the buffer.
1416 * @return
1417 * - `1` on successful read of all parameters.
1418 * - `0` on failure to read any parameter.
1419 */
1421 int32_t i;
1422 SDDS_LAYOUT *layout;
1423 /* char *predefined_format; */
1424 static char buffer[SDDS_MAXLINE];
1425
1426 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_ReadBinaryParameters"))
1427 return (0);
1428 layout = &SDDS_dataset->layout;
1429 if (!layout->n_parameters)
1430 return (1);
1431 for (i = 0; i < layout->n_parameters; i++) {
1432 if (layout->parameter_definition[i].definition_mode & SDDS_WRITEONLY_DEFINITION)
1433 continue;
1434 if (layout->parameter_definition[i].fixed_value) {
1435 strcpy(buffer, layout->parameter_definition[i].fixed_value);
1436 if (!SDDS_ScanData(buffer, layout->parameter_definition[i].type, 0, SDDS_dataset->parameter[i], 0, 1)) {
1437 SDDS_SetError("Unable to read page--parameter scanning error (SDDS_MPI_ReadBinaryParameters)");
1438 return (0);
1439 }
1440 } else if (layout->parameter_definition[i].type == SDDS_STRING) {
1441 if (*(char **)SDDS_dataset->parameter[i])
1442 free(*(char **)SDDS_dataset->parameter[i]);
1443 if (!(*((char **)SDDS_dataset->parameter[i]) = SDDS_MPI_ReadBinaryString(SDDS_dataset, fBuffer, 0))) {
1444 SDDS_SetError("Unable to read parameters--failure reading string (SDDS_MPI_ReadBinaryParameters)");
1445 return (0);
1446 }
1447 } else {
1448 if (!SDDS_MPI_BufferedRead(SDDS_dataset->parameter[i], SDDS_type_size[layout->parameter_definition[i].type - 1], SDDS_dataset, fBuffer)) {
1449 SDDS_SetError("Unable to read parameters--failure reading value (SDDS_MPI_ReadBinaryParameters)");
1450 return (0);
1451 }
1452 }
1453 }
1454 return (1);
1455}
1456
1457/**
1458 * @brief Read a binary row from the SDDS dataset using MPI.
1459 *
1460 * This function reads a specific row of data from the SDDS dataset in binary format using MPI.
1461 * It handles different column types, including strings and fixed-size data types. If the `skip` parameter
1462 * is set, string data is read and discarded.
1463 *
1464 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1465 * @param row The row index to read.
1466 * @param skip If non-zero, string data in the row is read but not stored.
1467 * @return
1468 * - `1` on successful read of the row.
1469 * - `0` on failure to read any part of the row.
1470 */
1471int32_t SDDS_MPI_ReadBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row, int32_t skip) {
1472 int64_t i;
1473 int32_t type, size;
1474 SDDS_LAYOUT *layout;
1475 SDDS_FILEBUFFER *fBuffer;
1476
1477 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_ReadBinaryRow"))
1478 return (0);
1479 layout = &SDDS_dataset->layout;
1480 fBuffer = &SDDS_dataset->fBuffer;
1481
1482 for (i = 0; i < layout->n_columns; i++) {
1483 if (layout->column_definition[i].definition_mode & SDDS_WRITEONLY_DEFINITION)
1484 continue;
1485 if ((type = layout->column_definition[i].type) == SDDS_STRING) {
1486 if (!skip) {
1487 if (((char ***)SDDS_dataset->data)[i][row])
1488 free((((char ***)SDDS_dataset->data)[i][row]));
1489 if (!(((char ***)SDDS_dataset->data)[i][row] = SDDS_MPI_ReadBinaryString(SDDS_dataset, fBuffer, 0))) {
1490 SDDS_SetError("Unable to read rows--failure reading string (SDDS_MPI_ReadBinaryRows)");
1491 return (0);
1492 }
1493 } else {
1494 if (!(((char ***)SDDS_dataset->data)[i][row] = SDDS_MPI_ReadBinaryString(SDDS_dataset, fBuffer, 1))) {
1495 SDDS_SetError("Unable to read rows--failure reading string (SDDS_MPI_ReadBinaryRows)");
1496 return 0;
1497 }
1498 }
1499 } else {
1500 size = SDDS_type_size[type - 1];
1501 if (!SDDS_MPI_BufferedRead(skip ? NULL : (char *)SDDS_dataset->data[i] + row * size, size, SDDS_dataset, fBuffer)) {
1502 SDDS_SetError("Unable to read row--failure reading value (SDDS_MPI_ReadBinaryRow)");
1503 return (0);
1504 }
1505 }
1506 }
1507 return (1);
1508}
1509
1510/**
1511 * @brief Read binary arrays from the SDDS dataset using MPI.
1512 *
1513 * This function reads all arrays defined in the SDDS dataset in binary format using MPI.
1514 * It handles reading array dimensions, allocating memory for array data, and reading
1515 * the actual array data. For string arrays, it reads each string individually.
1516 *
1517 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1518 * @param fBuffer Pointer to the SDDS_FILEBUFFER structure managing the buffer.
1519 * @return
1520 * - `1` on successful read of all arrays.
1521 * - `0` on failure to read any part of the arrays.
1522 */
1524 int32_t i, j;
1525 SDDS_LAYOUT *layout;
1526 /* char *predefined_format; */
1527 /* static char buffer[SDDS_MAXLINE]; */
1528 SDDS_ARRAY *array;
1529
1530 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_ReadBinaryArrays"))
1531 return (0);
1532 layout = &SDDS_dataset->layout;
1533 if (!layout->n_arrays)
1534 return (1);
1535
1536 if (!SDDS_dataset->array) {
1537 SDDS_SetError("Unable to read array--pointer to structure storage area is NULL (SDDS_MPI_ReadBinaryArrays)");
1538 return (0);
1539 }
1540 for (i = 0; i < layout->n_arrays; i++) {
1541 array = SDDS_dataset->array + i;
1542 if (array->definition && !SDDS_FreeArrayDefinition(array->definition)) {
1543 SDDS_SetError("Unable to get array--array definition corrupted (SDDS_MPI_ReadBinaryArrays)");
1544 return (0);
1545 }
1546 if (!SDDS_CopyArrayDefinition(&array->definition, layout->array_definition + i)) {
1547 SDDS_SetError("Unable to read array--definition copy failed (SDDS_MPI_ReadBinaryArrays)");
1548 return (0);
1549 }
1550 /*if (array->dimension) free(array->dimension); */
1551 if (!(array->dimension = SDDS_Realloc(array->dimension, sizeof(*array->dimension) * array->definition->dimensions))) {
1552 SDDS_SetError("Unable to read array--allocation failure (SDDS_MPI_ReadBinaryArrays)");
1553 return (0);
1554 }
1555 if (!SDDS_MPI_BufferedRead(array->dimension, sizeof(*array->dimension) * array->definition->dimensions, SDDS_dataset, fBuffer)) {
1556 SDDS_SetError("Unable to read arrays--failure reading dimensions (SDDS_MPI_ReadBinaryArrays)");
1557 return (0);
1558 }
1559 array->elements = 1;
1560 for (j = 0; j < array->definition->dimensions; j++)
1561 array->elements *= array->dimension[j];
1562 if (array->data)
1563 free(array->data);
1564 array->data = array->pointer = NULL;
1565 if (array->elements == 0)
1566 continue;
1567 if (array->elements < 0) {
1568 SDDS_SetError("Unable to read array--number of elements is negative (SDDS_MPI_ReadBinaryArrays)");
1569 return (0);
1570 }
1571 if (!(array->data = SDDS_Realloc(array->data, array->elements * SDDS_type_size[array->definition->type - 1]))) {
1572 SDDS_SetError("Unable to read array--allocation failure (SDDS_MPI_ReadBinaryArrays)");
1573 return (0);
1574 }
1575 if (array->definition->type == SDDS_STRING) {
1576 for (j = 0; j < array->elements; j++) {
1577 if (!(((char **)(array->data))[j] = SDDS_MPI_ReadBinaryString(SDDS_dataset, fBuffer, 0))) {
1578 SDDS_SetError("Unable to read arrays--failure reading string (SDDS_MPI_ReadBinaryArrays)");
1579 return (0);
1580 }
1581 }
1582 } else {
1583 if (!SDDS_MPI_BufferedRead(array->data, SDDS_type_size[array->definition->type - 1] * array->elements, SDDS_dataset, fBuffer)) {
1584 SDDS_SetError("Unable to read arrays--failure reading values (SDDS_MPI_ReadBinaryArrays)");
1585 return (0);
1586 }
1587 }
1588 }
1589 return (1);
1590}
1591
1592/**
1593 * @brief Read non-native binary parameters from the SDDS dataset using MPI.
1594 *
1595 * This function reads all non-fixed parameters from the SDDS dataset in non-native
1596 * binary format using MPI. It handles byte swapping for parameter data to match
1597 * the non-native byte order. String parameters are read as non-native binary strings,
1598 * and other types are read directly into the dataset's parameter storage.
1599 *
1600 * @param SDDS_dataset Pointer to the SDDS_DATASET structure.
1601 * @param fBuffer Pointer to the SDDS_FILEBUFFER structure managing the buffer.
1602 * @return
1603 * - `1` on successful read of all parameters.
1604 * - `0` on failure to read any parameter.
1605 */
1607 int32_t i;
1608 SDDS_LAYOUT *layout;
1609 static char buffer[SDDS_MAXLINE];
1610
1611 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_ReadNonNativeBinaryParameters"))
1612 return (0);
1613 layout = &SDDS_dataset->layout;
1614 if (!layout->n_parameters)
1615 return (1);
1616 for (i = 0; i < layout->n_parameters; i++) {
1617 if (layout->parameter_definition[i].definition_mode & SDDS_WRITEONLY_DEFINITION)
1618 continue;
1619 if (layout->parameter_definition[i].fixed_value) {
1620 strcpy(buffer, layout->parameter_definition[i].fixed_value);
1621 if (!SDDS_ScanData(buffer, layout->parameter_definition[i].type, 0, SDDS_dataset->parameter[i], 0, 1)) {
1622 SDDS_SetError("Unable to read page--parameter scanning error (SDDS_MPI_ReadNonNativeBinaryParameters)");
1623 return (0);
1624 }
1625 } else if (layout->parameter_definition[i].type == SDDS_STRING) {
1626 if (*(char **)SDDS_dataset->parameter[i])
1627 free(*(char **)SDDS_dataset->parameter[i]);
1628 if (!(*((char **)SDDS_dataset->parameter[i]) = SDDS_MPI_ReadNonNativeBinaryString(SDDS_dataset, fBuffer, 0))) {
1629 SDDS_SetError("Unable to read parameters--failure reading string (SDDS_MPI_ReadNonNativeBinaryParameters)");
1630 return (0);
1631 }
1632 } else {
1633 if (!SDDS_MPI_BufferedRead(SDDS_dataset->parameter[i], SDDS_type_size[layout->parameter_definition[i].type - 1], SDDS_dataset, fBuffer)) {
1634 SDDS_SetError("Unable to read parameters--failure reading value (SDDS_MPI_ReadNonNativeBinaryParameters)");
1635 return (0);
1636 }
1637 }
1638 }
1639 SDDS_SwapEndsParameterData(SDDS_dataset);
1640 return (1);
1641}
1642
1643/**
1644 * @brief Reads non-native binary arrays from a binary file buffer into the SDDS dataset using MPI.
1645 *
1646 * This function validates the provided SDDS dataset and reads array definitions and data from a non-native binary file buffer.
1647 * It handles byte swapping for endianness, allocates memory for array dimensions and data, and processes string arrays individually.
1648 *
1649 * @param[in,out] SDDS_dataset Pointer to the SDDS_DATASET structure where the arrays will be stored.
1650 * @param[in] fBuffer Pointer to the SDDS_FILEBUFFER structure containing the binary data to be read.
1651 *
1652 * @return int32_t Returns 1 on successful read, 0 on failure.
1653 */
1655 int32_t i, j;
1656 SDDS_LAYOUT *layout;
1657 SDDS_ARRAY *array;
1658
1659 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_ReadNonNativeBinaryArrays"))
1660 return (0);
1661 layout = &SDDS_dataset->layout;
1662 if (!layout->n_arrays)
1663 return (1);
1664
1665 if (!SDDS_dataset->array) {
1666 SDDS_SetError("Unable to read array--pointer to structure storage area is NULL (SDDS_MPI_ReadNonNativeBinaryArrays)");
1667 return (0);
1668 }
1669 for (i = 0; i < layout->n_arrays; i++) {
1670 array = SDDS_dataset->array + i;
1671 if (array->definition && !SDDS_FreeArrayDefinition(array->definition)) {
1672 SDDS_SetError("Unable to get array--array definition corrupted (SDDS_MPI_ReadNonNativeBinaryArrays)");
1673 return (0);
1674 }
1675 if (!SDDS_CopyArrayDefinition(&array->definition, layout->array_definition + i)) {
1676 SDDS_SetError("Unable to read array--definition copy failed (SDDS_MPI_ReadNonNativeBinaryArrays)");
1677 return (0);
1678 }
1679 /*if (array->dimension) free(array->dimension); */
1680 if (!(array->dimension = SDDS_Realloc(array->dimension, sizeof(*array->dimension) * array->definition->dimensions))) {
1681 SDDS_SetError("Unable to read array--allocation failure (SDDS_MPI_ReadNonNativeBinaryArrays)");
1682 return (0);
1683 }
1684 if (!SDDS_MPI_BufferedRead(array->dimension, sizeof(*array->dimension) * array->definition->dimensions, SDDS_dataset, fBuffer)) {
1685 SDDS_SetError("Unable to read arrays--failure reading dimensions (SDDS_MPI_ReadNonNativeBinaryArrays)");
1686 return (0);
1687 }
1688 SDDS_SwapLong(array->dimension);
1689 array->elements = 1;
1690 for (j = 0; j < array->definition->dimensions; j++)
1691 array->elements *= array->dimension[j];
1692 if (array->data)
1693 free(array->data);
1694 array->data = array->pointer = NULL;
1695 if (array->elements == 0)
1696 continue;
1697 if (array->elements < 0) {
1698 SDDS_SetError("Unable to read array--number of elements is negative (SDDS_MPI_ReadNonNativeBinaryArrays)");
1699 return (0);
1700 }
1701 if (!(array->data = SDDS_Realloc(array->data, array->elements * SDDS_type_size[array->definition->type - 1]))) {
1702 SDDS_SetError("Unable to read array--allocation failure (SDDS_MPI_ReadNonNativeBinaryArrays)");
1703 return (0);
1704 }
1705 if (array->definition->type == SDDS_STRING) {
1706 for (j = 0; j < array->elements; j++) {
1707 if (!(((char **)(array->data))[j] = SDDS_MPI_ReadNonNativeBinaryString(SDDS_dataset, fBuffer, 0))) {
1708 SDDS_SetError("Unable to read arrays--failure reading string (SDDS_MPI_ReadNonNativeBinaryArrays)");
1709 return (0);
1710 }
1711 }
1712 } else {
1713 if (!SDDS_MPI_BufferedRead(array->data, SDDS_type_size[array->definition->type - 1] * array->elements, SDDS_dataset, fBuffer)) {
1714 SDDS_SetError("Unable to read arrays--failure reading values (SDDS_MPI_ReadNonNativeBinaryArrays)");
1715 return (0);
1716 }
1717 }
1718 }
1719 SDDS_SwapEndsArrayData(SDDS_dataset);
1720 return (1);
1721}
1722
1723/**
1724 * @brief Reads a single non-native binary row from a binary file buffer into the SDDS dataset using MPI.
1725 *
1726 * This function reads the data for a specific row from the binary file buffer. It handles different data types, including strings,
1727 * and can optionally skip reading the row data. If skipping, it advances the file buffer without storing the data.
1728 *
1729 * @param[in,out] SDDS_dataset Pointer to the SDDS_DATASET structure where the row data will be stored.
1730 * @param[in] row The index of the row to read.
1731 * @param[in] skip If non-zero, the function will skip reading the row data without storing it.
1732 *
1733 * @return int32_t Returns 1 on successful read, 0 on failure.
1734 */
1735int32_t SDDS_MPI_ReadNonNativeBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row, int32_t skip) {
1736 int64_t i;
1737 int32_t type, size;
1738 SDDS_LAYOUT *layout;
1739 SDDS_FILEBUFFER *fBuffer;
1740
1741 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_ReadNonNativeBinaryRow"))
1742 return (0);
1743 layout = &SDDS_dataset->layout;
1744 fBuffer = &SDDS_dataset->fBuffer;
1745
1746 for (i = 0; i < layout->n_columns; i++) {
1747 if (layout->column_definition[i].definition_mode & SDDS_WRITEONLY_DEFINITION)
1748 continue;
1749 if ((type = layout->column_definition[i].type) == SDDS_STRING) {
1750 if (!skip) {
1751 if (((char ***)SDDS_dataset->data)[i][row])
1752 free((((char ***)SDDS_dataset->data)[i][row]));
1753 if (!(((char ***)SDDS_dataset->data)[i][row] = SDDS_MPI_ReadNonNativeBinaryString(SDDS_dataset, fBuffer, 0))) {
1754 SDDS_SetError("Unable to read rows--failure reading string (SDDS_MPI_ReadNonNativeBinaryRow)");
1755 return (0);
1756 }
1757 } else {
1758 if (!SDDS_MPI_ReadNonNativeBinaryString(SDDS_dataset, fBuffer, 1)) {
1759 SDDS_SetError("Unable to read rows--failure reading string (SDDS_MPI_ReadNonNativeBinaryRow)");
1760 return 0;
1761 }
1762 }
1763 } else {
1764 size = SDDS_type_size[type - 1];
1765 if (!SDDS_MPI_BufferedRead(skip ? NULL : (char *)SDDS_dataset->data[i] + row * size, size, SDDS_dataset, fBuffer)) {
1766 SDDS_SetError("Unable to read row--failure reading value (SDDS_MPI_ReadNonNativeBinaryRow)");
1767 return (0);
1768 }
1769 }
1770 }
1771 return (1);
1772}
1773
1774/**
1775 * @brief Broadcasts the title data (parameters and arrays) of the SDDS dataset to all MPI processes.
1776 *
1777 * This function gathers the title data from the master process and broadcasts it to all other MPI processes.
1778 * It handles the total number of rows, parameters, and array definitions, ensuring that each process has consistent dataset metadata.
1779 *
1780 * @param[in,out] SDDS_dataset Pointer to the SDDS_DATASET structure containing the title data to broadcast.
1781 *
1782 * @return int32_t Returns 1 on successful broadcast, 0 on failure.
1783 */
1785 char *par_data = NULL;
1786 int32_t type, size;
1787 int64_t i, count = 0, *data_len = NULL;
1788 MPI_DATASET *MPI_dataset;
1789 SDDS_LAYOUT *layout;
1790 char *string = NULL;
1791
1792 MPI_dataset = SDDS_dataset->MPI_dataset;
1793 layout = &(SDDS_dataset->layout);
1794 if (!layout->n_parameters && !layout->n_arrays) {
1795 /* no parameters and arrays, only broadcast total_rows */
1796 MPI_Bcast(&(MPI_dataset->total_rows), 1, MPI_INT64_T, 0, MPI_dataset->comm);
1797 } else {
1798 /* broadcast the total_rows and parameter data */
1799 data_len = calloc(sizeof(*data_len), 1 + layout->n_parameters);
1800 if (MPI_dataset->myid == 0) {
1801 data_len[0] = sizeof(int64_t);
1802 count = data_len[0];
1803 for (i = 0; i < layout->n_parameters; i++) {
1804 type = layout->parameter_definition[i].type;
1805 if (type == SDDS_STRING)
1806 data_len[i + 1] = strlen(*((char **)SDDS_dataset->parameter[i])) * sizeof(char);
1807 else
1808 data_len[i + 1] = SDDS_type_size[type - 1];
1809 ;
1810 count += data_len[i + 1];
1811 }
1812 par_data = (char *)malloc(sizeof(char) * count);
1813 memcpy((char *)par_data, &(MPI_dataset->total_rows), data_len[0]);
1814 count = data_len[0];
1815 for (i = 0; i < layout->n_parameters; i++) {
1816 if (layout->parameter_definition[i].type == SDDS_STRING)
1817 memcpy((char *)par_data + count, *(char **)SDDS_dataset->parameter[i], data_len[i + 1]);
1818 else
1819 memcpy((char *)par_data + count, (char *)SDDS_dataset->parameter[i], data_len[i + 1]);
1820 count += data_len[i + 1];
1821 }
1822 }
1823 MPI_Bcast(data_len, 1 + layout->n_parameters, MPI_INT64_T, 0, MPI_dataset->comm);
1824 if (MPI_dataset->myid) {
1825 count = data_len[0];
1826 for (i = 0; i < layout->n_parameters; i++)
1827 count += data_len[i + 1];
1828 par_data = (char *)malloc(sizeof(char) * count);
1829 }
1830
1831 MPI_Bcast(par_data, count, MPI_BYTE, 0, MPI_dataset->comm);
1832 if (!SDDS_StartPage(SDDS_dataset, 0)) {
1833 SDDS_SetError("Unable to read page--couldn't start page (SDDS_MPI_BroadcastTitleData)");
1834 return (0);
1835 }
1836 if (MPI_dataset->myid) {
1837 memcpy(&(MPI_dataset->total_rows), (char *)par_data, data_len[0]);
1838 count = data_len[0];
1839 for (i = 0; i < layout->n_parameters; i++) {
1840 if (layout->parameter_definition[i].type == SDDS_STRING) {
1841 string = malloc(sizeof(char) * (data_len[i + 1] + 1));
1842 memcpy((char *)string, (char *)par_data + count, data_len[i + 1]);
1843 string[data_len[i + 1]] = '\0';
1844 *(char **)SDDS_dataset->parameter[i] = string;
1845 } else
1846 memcpy((char *)(SDDS_dataset->parameter[i]), (char *)par_data + count, data_len[i + 1]);
1847 count += data_len[i + 1];
1848 }
1849 }
1850 }
1851 free(data_len);
1852 free(par_data);
1853 data_len = NULL;
1854 par_data = NULL;
1855 if (layout->n_arrays) {
1856 data_len = malloc(sizeof(*data_len) * layout->n_arrays);
1857 if (MPI_dataset->myid == 0) {
1858 for (i = 0; i < layout->n_arrays; i++)
1859 data_len[i] = layout->array_definition[i].dimensions;
1860 }
1861 MPI_Bcast(data_len, layout->n_arrays, MPI_INT64_T, 0, MPI_dataset->comm);
1862 for (i = 0; i < layout->n_arrays; i++) {
1863 type = layout->array_definition[i].type;
1864 size = SDDS_type_size[type - 1];
1865 if (data_len[i]) {
1866 if (type == SDDS_STRING) {
1867 if (MPI_dataset->myid == 0) {
1868 /* it is not easy to broad cast string array, will implement it in the future */
1869 }
1870 } else
1871 MPI_Bcast((char *)SDDS_dataset->array[i].data, data_len[i] * size, MPI_BYTE, 0, MPI_dataset->comm);
1872 }
1873 }
1874 }
1875 /*MPI_dataset->file_offset += SDDS_MPI_GetTitleOffset(MPI_dataset); */
1876 return 1;
1877}
1878
1879/*flag master_read: 1 master processor will read rows; 0: master processor does not read rows.*/
1880/**
1881 * @brief Reads a binary page from an SDDS dataset using MPI parallel I/O.
1882 *
1883 * This function reads a page of data from a binary SDDS file in parallel using MPI. It handles reading the page title,
1884 * distributing the rows among MPI processes, and reading the column or row data depending on the dataset's data mode.
1885 * It also manages error handling and supports read recovery.
1886 *
1887 * @param[in,out] SDDS_dataset Pointer to the SDDS_DATASET structure where the page data will be stored.
1888 *
1889 * @return int32_t Returns the page number on success, 0 on failure, or -1 if end-of-file is reached.
1890 */
1892 int32_t mpi_code, type = 0, retval, master_read;
1893 int64_t i, j, n_rows, prev_rows;
1894 MPI_DATASET *MPI_dataset;
1895 SDDS_FILEBUFFER *fBuffer;
1896 MPI_Status status;
1897 MPI_Offset offset;
1898 long ID_offset;
1899
1900 MPI_dataset = SDDS_dataset->MPI_dataset;
1901 master_read = MPI_dataset->master_read;
1902
1903 if (SDDS_dataset->autoRecovered)
1904 return -1;
1905 if (SDDS_dataset->swapByteOrder) {
1906 return SDDS_MPI_ReadNonNativeBinaryPage(SDDS_dataset);
1907 }
1908 /* static char s[SDDS_MAXLINE]; */
1909 n_rows = 0;
1910 SDDS_SetReadRecoveryMode(SDDS_dataset, 0);
1911 if (MPI_dataset->file_offset >= MPI_dataset->file_size)
1912 return (SDDS_dataset->page_number = -1);
1913 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
1914 SDDS_MPI_GOTO_ERROR(stderr, "Unable to set view for read binary page", mpi_code, 0);
1915 SDDS_SetError("Unable to set view for read binary page(1)");
1916 return 0;
1917 }
1918#if defined(MASTER_READTITLE_ONLY)
1919 if (MPI_dataset->myid == 0)
1920#endif
1921 retval = SDDS_MPI_BufferedReadBinaryTitle(SDDS_dataset);
1922#if defined(MASTER_READTITLE_ONLY)
1923 MPI_Bcast(&retval, 1, MPI_INT, 0, MPI_dataset->comm);
1924#endif
1925 /*end of file reached */
1926 if (retval < 0)
1927 return (SDDS_dataset->page_number = -1);
1928 if (retval == 0) {
1929 SDDS_SetError("Unable to read the SDDS title (row number, parameter and/or array) data");
1930 return 0;
1931 }
1932#if defined(MASTER_READTITLE_ONLY)
1933 SDDS_MPI_BroadcastTitleData(SDDS_dataset);
1934#endif
1935 MPI_dataset->file_offset += SDDS_MPI_GetTitleOffset(SDDS_dataset);
1936 if (MPI_dataset->total_rows < 0) {
1937 SDDS_SetError("Unable to read page--negative number of rows (SDDS_MPI_ReadBinaryPage)");
1938 return (0);
1939 }
1940 if (MPI_dataset->total_rows > SDDS_GetRowLimit()) {
1941 /* the number of rows is "unreasonably" large---treat like end-of-file */
1942 return (SDDS_dataset->page_number = -1);
1943 }
1944 prev_rows = 0;
1945 if (master_read) {
1946 n_rows = MPI_dataset->total_rows / MPI_dataset->n_processors;
1947 prev_rows = MPI_dataset->myid * n_rows;
1948 if (MPI_dataset->myid < (ID_offset = MPI_dataset->total_rows % (MPI_dataset->n_processors))) {
1949 n_rows++;
1950 prev_rows += MPI_dataset->myid;
1951 } else
1952 prev_rows += ID_offset;
1953 } else {
1954 if (MPI_dataset->myid == 0)
1955 n_rows = 0;
1956 else {
1957 n_rows = MPI_dataset->total_rows / (MPI_dataset->n_processors - 1);
1958 prev_rows = (MPI_dataset->myid - 1) * n_rows;
1959 if (MPI_dataset->myid <= (ID_offset = MPI_dataset->total_rows % (MPI_dataset->n_processors - 1))) {
1960 n_rows++;
1961 prev_rows += (MPI_dataset->myid - 1);
1962 } else
1963 prev_rows += ID_offset;
1964 }
1965 }
1966 MPI_dataset->start_row = prev_rows; /* This number will be used as the paritlce ID offset */
1967 if (!SDDS_StartPage(SDDS_dataset, 0) || !SDDS_LengthenTable(SDDS_dataset, n_rows)) {
1968 SDDS_SetError("Unable to read page--couldn't start page (SDDS_MPI_ReadBinaryPage)");
1969 return (0);
1970 }
1971 offset = MPI_dataset->file_offset;
1972 fBuffer = &SDDS_dataset->fBuffer;
1973
1974 if (SDDS_dataset->layout.data_mode.column_major) {
1975 /*read by column buffer is not need */
1976 for (i = 0; i < SDDS_dataset->layout.n_columns; i++) {
1977 type = SDDS_dataset->layout.column_definition[i].type;
1978 if (type == SDDS_STRING) {
1979 SDDS_SetError("Can not read string column from SDDS3 (SDDS_MPI_ReadBinaryPage");
1980 return 0;
1981 }
1982 MPI_dataset->file_offset = offset + (MPI_Offset)prev_rows * SDDS_type_size[type - 1];
1983 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
1984 SDDS_SetError("Unable to set view for read binary columns");
1985 return 0;
1986 }
1987 if (!MPI_dataset->collective_io) {
1988 if ((mpi_code = MPI_File_read(MPI_dataset->MPI_file, (char *)SDDS_dataset->data[i], n_rows * SDDS_type_size[type - 1], MPI_BYTE, &status)) != MPI_SUCCESS) {
1989 SDDS_SetError("Unable to set view for read binary columns");
1990 return 0;
1991 }
1992 } else {
1993 if ((mpi_code = MPI_File_read_all(MPI_dataset->MPI_file, (char *)SDDS_dataset->data[i], n_rows * SDDS_type_size[type - 1], MPI_BYTE, &status)) != MPI_SUCCESS) {
1994 SDDS_SetError("Unable to set view for read binary columns");
1995 return 0;
1996 }
1997 }
1998 offset += (MPI_Offset)MPI_dataset->total_rows * SDDS_type_size[type - 1];
1999 }
2000 MPI_dataset->n_rows = SDDS_dataset->n_rows = n_rows;
2001 MPI_dataset->file_offset = offset;
2002 } else {
2003 /* read row by row */
2004 if (!fBuffer->buffer) {
2005 fBuffer->bufferSize = defaultReadBufferSize;
2006 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
2007 SDDS_SetError("Unable to do buffered read--allocation failure");
2008 return 0;
2009 }
2010 fBuffer->bytesLeft = 0;
2011 fBuffer->data[0] = 0;
2012 }
2013 if (fBuffer->bytesLeft > 0) {
2014 /* discard the extra data for reading next page */
2015 fBuffer->data[0] = 0;
2016 fBuffer->bytesLeft = 0;
2017 }
2018 MPI_dataset->file_offset += (MPI_Offset)prev_rows * MPI_dataset->column_offset;
2019 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
2020 SDDS_MPI_GOTO_ERROR(stderr, "Unable to set view for read binary rows", mpi_code, 0);
2021 SDDS_SetError("Unable to set view for read binary rows");
2022 return 0;
2023 }
2024 if (!master_read || !MPI_dataset->collective_io) {
2025 for (j = 0; j < n_rows; j++) {
2026 if (!SDDS_MPI_ReadBinaryRow(SDDS_dataset, j, 0)) {
2027 SDDS_dataset->n_rows = j;
2028 if (SDDS_dataset->autoRecover) {
2029#if defined(DEBUG)
2030 fprintf(stderr, "Doing auto-read recovery\n");
2031#endif
2032 SDDS_dataset->autoRecovered = 1;
2034 return (SDDS_dataset->page_number = MPI_dataset->n_page);
2035 }
2036 SDDS_SetError("Unable to read page--error reading data row (SDDS_MPI_ReadBinaryPage)");
2037 SDDS_SetReadRecoveryMode(SDDS_dataset, 1);
2038 return (0);
2039 }
2040 }
2041 MPI_dataset->n_rows = SDDS_dataset->n_rows = j;
2042 } else {
2043 MPI_dataset->n_rows = SDDS_dataset->n_rows = n_rows;
2044 if (!SDDS_MPI_CollectiveReadByRow(SDDS_dataset))
2045 return 0;
2046 }
2047 MPI_dataset->file_offset = offset + MPI_dataset->total_rows * MPI_dataset->column_offset;
2048 }
2049 MPI_dataset->n_page++;
2050 return (SDDS_dataset->page_number = MPI_dataset->n_page);
2051}
2052
2053/**
2054 * @brief Reads a non-native binary page from an SDDS dataset using MPI parallel I/O.
2055 *
2056 * This function is a wrapper that calls SDDS_MPI_ReadNonNativePageSparse with mode set to 0,
2057 * facilitating the reading of non-native binary pages.
2058 *
2059 * @param[in,out] SDDS_dataset Pointer to the SDDS_DATASET structure where the page data will be stored.
2060 *
2061 * @return int32_t Returns the page number on success, 0 on failure, or -1 if end-of-file is reached.
2062 */
2064 return SDDS_MPI_ReadNonNativePageSparse(SDDS_dataset, 0);
2065}
2066
2067/**
2068 * @brief Reads a sparse non-native binary page from an SDDS dataset using MPI parallel I/O.
2069 *
2070 * This function reads a page of data from a non-native binary SDDS file in parallel using MPI. The `mode` parameter allows for future expansion.
2071 * It handles reading the title data, broadcasting it to all MPI processes, and reading the column or row data based on the dataset's data mode.
2072 *
2073 * @param[in,out] SDDS_dataset Pointer to the SDDS_DATASET structure where the page data will be stored.
2074 * @param[in] mode Mode flag to support future expansion (currently unused).
2075 *
2076 * @return int32_t Returns the page number on success, 0 on failure, or -1 if end-of-file is reached.
2077 */
2078int32_t SDDS_MPI_ReadNonNativePageSparse(SDDS_DATASET *SDDS_dataset, uint32_t mode)
2079/* the mode argument is to support future expansion */
2080{
2081 int32_t retval;
2082
2083 if (!SDDS_CheckDataset(SDDS_dataset, "SDDS_MPI_ReadNonNativePageSparse"))
2084 return (0);
2085 if (SDDS_dataset->layout.disconnected) {
2086 SDDS_SetError("Can't read page--file is disconnected (SDDS_MPI_ReadNonNativePageSparse)");
2087 return 0;
2088 }
2089
2090 if (SDDS_dataset->original_layout.data_mode.mode == SDDS_ASCII) {
2091 SDDS_SetError("Can not read assii file with parallel io.");
2092 return 0;
2093 } else if (SDDS_dataset->original_layout.data_mode.mode == SDDS_BINARY) {
2094 if ((retval = SDDS_MPI_ReadNonNativeBinaryPage(SDDS_dataset)) < 1) {
2095 return (retval);
2096 }
2097 } else {
2098 SDDS_SetError("Unable to read page--unrecognized data mode (SDDS_MPI_ReadNonNativePageSparse)");
2099 return (0);
2100 }
2101 return (retval);
2102}
2103
2104/**
2105 * @brief Reads a non-native binary page from an SDDS dataset using MPI parallel I/O.
2106 *
2107 * This function reads a page of data from a non-native binary SDDS file in parallel using MPI. It manages reading the title data,
2108 * distributing the rows among MPI processes, and reading the column or row data depending on the dataset's data mode.
2109 * It also handles byte swapping for endianness and error management.
2110 *
2111 * @param[in,out] SDDS_dataset Pointer to the SDDS_DATASET structure where the page data will be stored.
2112 *
2113 * @return int32_t Returns the page number on success, 0 on failure, or -1 if end-of-file is reached.
2114 */
2116 int32_t ID_offset, mpi_code, master_read, type, retval;
2117 int64_t i, j, n_rows, total_rows, prev_rows;
2118 SDDS_FILEBUFFER *fBuffer;
2119 MPI_DATASET *MPI_dataset;
2120 MPI_Offset offset;
2121 MPI_Status status;
2122
2123 MPI_dataset = SDDS_dataset->MPI_dataset;
2124 master_read = MPI_dataset->master_read;
2125 /* static char s[SDDS_MAXLINE]; */
2126 n_rows = 0;
2127 SDDS_SetReadRecoveryMode(SDDS_dataset, 0);
2128 if (MPI_dataset->file_offset >= MPI_dataset->file_size)
2129 return (SDDS_dataset->page_number = -1);
2130
2131 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
2132 SDDS_MPI_GOTO_ERROR(stderr, "Unable to set view for read binary page", mpi_code, 0);
2133 SDDS_SetError("Unable to set view for read binary page(1)");
2134 return 0;
2135 }
2136 /* read the number of rows in current page */
2137#if defined(MASTER_READTITLE_ONLY)
2138 if (MPI_dataset->myid == 0)
2139#endif
2140 retval = SDDS_MPI_BufferedReadNonNativeBinaryTitle(SDDS_dataset);
2141#if defined(MASTER_READTITLE_ONLY)
2142 MPI_Bcast(&retval, 1, MPI_INT, 0, MPI_dataset->comm);
2143#endif
2144 /*end of file reached */
2145 if (retval < 0)
2146 return (SDDS_dataset->page_number = -1);
2147 if (retval == 0) {
2148 SDDS_SetError("Unable to read the SDDS title (row number, parameter and/or array) data");
2149 return 0;
2150 }
2151#if defined(MASTER_READTITLE_ONLY)
2152 SDDS_MPI_BroadcastTitleData(SDDS_dataset);
2153#endif
2154 MPI_dataset->file_offset += SDDS_MPI_GetTitleOffset(SDDS_dataset);
2155 if (MPI_dataset->total_rows < 0) {
2156 SDDS_SetError("Unable to read page--negative number of rows (SDDS_MPI_ReadBinaryPage)");
2157 return (0);
2158 }
2159 if (MPI_dataset->total_rows > SDDS_GetRowLimit()) {
2160 /* the number of rows is "unreasonably" large---treat like end-of-file */
2161 return (SDDS_dataset->page_number = -1);
2162 }
2163 total_rows = MPI_dataset->total_rows;
2164 prev_rows = 0;
2165 if (master_read) {
2166 n_rows = total_rows / MPI_dataset->n_processors;
2167 prev_rows = MPI_dataset->myid * n_rows;
2168 if (MPI_dataset->myid < (ID_offset = total_rows % (MPI_dataset->n_processors))) {
2169 n_rows++;
2170 prev_rows += MPI_dataset->myid;
2171 } else
2172 prev_rows += ID_offset;
2173 } else {
2174 if (MPI_dataset->myid == 0)
2175 n_rows = 0;
2176 else {
2177 n_rows = total_rows / (MPI_dataset->n_processors - 1);
2178 prev_rows = (MPI_dataset->myid - 1) * n_rows;
2179 if (MPI_dataset->myid <= (ID_offset = total_rows % (MPI_dataset->n_processors - 1))) {
2180 n_rows++;
2181 prev_rows += (MPI_dataset->myid - 1);
2182 } else
2183 prev_rows += ID_offset;
2184 }
2185 }
2186 MPI_dataset->start_row = prev_rows; /* This number will be used as the paritlce ID offset */
2187 if (!SDDS_StartPage(SDDS_dataset, 0) || !SDDS_LengthenTable(SDDS_dataset, n_rows)) {
2188 SDDS_SetError("Unable to read page--couldn't start page (SDDS_MPI_ReadNonNativeBinaryPage)");
2189 return (0);
2190 }
2191
2192 offset = MPI_dataset->file_offset;
2193 fBuffer = &SDDS_dataset->fBuffer;
2194 if (SDDS_dataset->layout.data_mode.column_major) {
2195 /*read by column buffer is not need */
2196 for (i = 0; i < SDDS_dataset->layout.n_columns; i++) {
2197 type = SDDS_dataset->layout.column_definition[i].type;
2198 if (type == SDDS_STRING) {
2199 SDDS_SetError("Can not read string column from SDDS3 (SDDS_MPI_ReadBinaryPage");
2200 return 0;
2201 }
2202 MPI_dataset->file_offset = offset + (MPI_Offset)prev_rows * SDDS_type_size[type - 1];
2203 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
2204 SDDS_SetError("Unable to set view for read binary columns");
2205 return 0;
2206 }
2207 if (!MPI_dataset->collective_io) {
2208 if ((mpi_code = MPI_File_read(MPI_dataset->MPI_file, (char *)SDDS_dataset->data[i], n_rows * SDDS_type_size[type - 1], MPI_BYTE, &status)) != MPI_SUCCESS) {
2209 SDDS_SetError("Unable to set view for read binary columns");
2210 return 0;
2211 }
2212 } else {
2213 if ((mpi_code = MPI_File_read_all(MPI_dataset->MPI_file, (char *)SDDS_dataset->data[i], n_rows * SDDS_type_size[type - 1], MPI_BYTE, &status)) != MPI_SUCCESS) {
2214 SDDS_SetError("Unable to set view for read binary columns");
2215 return 0;
2216 }
2217 }
2218 offset += (MPI_Offset)MPI_dataset->total_rows * SDDS_type_size[type - 1];
2219 }
2220 MPI_dataset->n_rows = SDDS_dataset->n_rows = n_rows;
2221 MPI_dataset->file_offset = offset;
2222 } else {
2223 /* read row by row */
2224 if (!fBuffer->buffer) {
2225 fBuffer->bufferSize = defaultReadBufferSize;
2226 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
2227 SDDS_SetError("Unable to do buffered read--allocation failure");
2228 return 0;
2229 }
2230 fBuffer->bytesLeft = 0;
2231 fBuffer->data[0] = 0;
2232 }
2233 if (fBuffer->bytesLeft > 0) {
2234 /* discard the extra data for reading next page */
2235 fBuffer->data[0] = 0;
2236 fBuffer->bytesLeft = 0;
2237 }
2238 MPI_dataset->file_offset += (MPI_Offset)prev_rows * MPI_dataset->column_offset;
2239 if ((mpi_code = MPI_File_set_view(MPI_dataset->MPI_file, MPI_dataset->file_offset, MPI_BYTE, MPI_BYTE, "native", MPI_INFO_NULL)) != MPI_SUCCESS) {
2240 SDDS_MPI_GOTO_ERROR(stderr, "Unable to set view for read binary rows", mpi_code, 0);
2241 SDDS_SetError("Unable to set view for read binary rows");
2242 return 0;
2243 }
2244 if (!MPI_dataset->collective_io || !master_read) {
2245 for (j = 0; j < n_rows; j++) {
2246 if (!SDDS_MPI_ReadBinaryRow(SDDS_dataset, j, 0)) {
2247 SDDS_dataset->n_rows = j - 1;
2248 if (SDDS_dataset->autoRecover) {
2250 SDDS_SwapEndsColumnData(SDDS_dataset);
2251 return (SDDS_dataset->page_number = MPI_dataset->n_page);
2252 }
2253 SDDS_SetError("Unable to read page--error reading data row (SDDS_MPI_ReadNonNativeBinaryPage)");
2254 SDDS_SetReadRecoveryMode(SDDS_dataset, 1);
2255 return (0);
2256 }
2257 }
2258 SDDS_dataset->n_rows = j;
2259 } else {
2260 MPI_dataset->n_rows = SDDS_dataset->n_rows = n_rows;
2261 if (!SDDS_MPI_CollectiveReadByRow(SDDS_dataset))
2262 return 0;
2263 }
2264 MPI_dataset->file_offset = offset + MPI_dataset->total_rows * MPI_dataset->column_offset;
2265 }
2266 SDDS_SwapEndsColumnData(SDDS_dataset);
2267 MPI_dataset->n_page++;
2268 MPI_Barrier(MPI_dataset->comm);
2269 return (SDDS_dataset->page_number = MPI_dataset->n_page);
2270}
2271
2272/**
2273 * @brief Buffers and reads the non-native binary title data from an SDDS dataset using MPI.
2274 *
2275 * This function initializes and manages a buffered read for the title section of a non-native binary SDDS dataset.
2276 * It reads the total number of rows, parameters, and arrays from the binary file buffer. The function handles
2277 * memory allocation for the buffer, byte swapping for endianness, and initializes the SDDS page structure.
2278 *
2279 * @param[in,out] SDDS_dataset Pointer to the `SDDS_DATASET` structure where the title data will be stored.
2280 *
2281 * @return int32_t Returns `1` on successful read, `0` on failure, and `-1` if the end of the file is reached.
2282 */
2284 SDDS_FILEBUFFER *fBuffer = NULL;
2285 MPI_DATASET *MPI_dataset = NULL;
2286 int32_t ret_val;
2287 int32_t total_rows;
2288
2289 MPI_dataset = SDDS_dataset->MPI_dataset;
2290 fBuffer = &(SDDS_dataset->titleBuffer);
2291 if (!fBuffer->buffer) {
2292 fBuffer->bufferSize = defaultTitleBufferSize;
2293 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
2294 SDDS_SetError("Unable to do buffered read--allocation failure(SDDS_MPI_ReadNonNativeBinaryTitle)");
2295 return 0;
2296 }
2297 fBuffer->bytesLeft = 0;
2298 }
2299 if (fBuffer->bytesLeft > 0) {
2300 /* discard the extra data for reading next page */
2301 fBuffer->data[0] = 0;
2302 fBuffer->bytesLeft = 0;
2303 }
2304 if ((ret_val = SDDS_MPI_BufferedRead((void *)&(total_rows), sizeof(int32_t), SDDS_dataset, fBuffer)) < 0)
2305 return -1;
2306 SDDS_SwapLong(&(total_rows));
2307 if (total_rows == INT32_MIN) {
2308 if ((ret_val = SDDS_MPI_BufferedRead((void *)&(MPI_dataset->total_rows), sizeof(int64_t), SDDS_dataset, fBuffer)) < 0)
2309 return -1;
2310 } else {
2311 MPI_dataset->total_rows = total_rows;
2312 }
2313 if (!ret_val)
2314 return 0;
2315 if (!SDDS_StartPage(SDDS_dataset, 0)) {
2316 SDDS_SetError("Unable to read page--couldn't start page (SDDS_MPI_BufferedReadNonNativeBinaryTitle)");
2317 return (0);
2318 }
2319 /*read parameters */
2320 if (!SDDS_MPI_ReadNonNativeBinaryParameters(SDDS_dataset, fBuffer)) {
2321 SDDS_SetError("Unable to read page--parameter reading error (SDDS_MPI_BufferedNonNativeReadTitle)");
2322 return (0);
2323 }
2324 /*read arrays */
2325 if (!SDDS_MPI_ReadNonNativeBinaryArrays(SDDS_dataset, fBuffer)) {
2326 SDDS_SetError("Unable to read page--array reading error (SDDS_MPI_BufferedNonNativeReadTitle)");
2327 return (0);
2328 }
2329
2330 return 1;
2331}
2332
2333/*obtain the offset of n_rows, parameters and arrays which are written by master processor */
2334/**
2335 * @brief Calculates the byte offset for the title section in a non-native binary SDDS dataset.
2336 *
2337 * This function computes the total byte offset required to read the number of rows, parameters, and arrays
2338 * defined in the SDDS dataset. It accounts for different data types, including strings, and variable-length
2339 * parameters and arrays. The calculated offset is used to position the MPI file view correctly for reading.
2340 *
2341 * @param[in] SDDS_dataset Pointer to the `SDDS_DATASET` structure containing the dataset layout and data.
2342 *
2343 * @return MPI_Offset The total byte offset of the title section in the binary file.
2344 */
2345MPI_Offset SDDS_MPI_GetTitleOffset(SDDS_DATASET *SDDS_dataset) {
2346 int64_t i, j;
2347 MPI_Offset offset = 0;
2348 SDDS_LAYOUT *layout;
2349
2350 layout = &(SDDS_dataset->layout);
2351 offset += sizeof(int32_t);
2352 if (SDDS_dataset->n_rows > INT32_MAX) {
2353 offset += sizeof(int64_t);
2354 }
2355 for (i = 0; i < layout->n_parameters; i++) {
2356 if (layout->parameter_definition[i].fixed_value)
2357 continue;
2358 if (layout->parameter_definition[i].type == SDDS_STRING) {
2359 if (*(char **)SDDS_dataset->parameter[i])
2360 offset += sizeof(int32_t) + sizeof(char) * strlen(*(char **)SDDS_dataset->parameter[i]);
2361 else
2362 offset += sizeof(int32_t); /*write length 0 for NULL string */
2363 } else {
2364 offset += SDDS_type_size[layout->parameter_definition[i].type - 1];
2365 }
2366 }
2367 for (i = 0; i < layout->n_arrays; i++) {
2368 if (!(SDDS_dataset->array[i].dimension)) {
2369 offset += layout->array_definition[i].dimensions * sizeof(int32_t);
2370 continue;
2371 }
2372 offset += sizeof(*(SDDS_dataset->array)[i].dimension) * layout->array_definition[i].dimensions;
2373 if (layout->array_definition[i].type == SDDS_STRING) {
2374 for (j = 0; j < SDDS_dataset->array[i].elements; j++) {
2375 if (((char **)SDDS_dataset->array[i].data)[j])
2376 offset += sizeof(int32_t) + sizeof(char) * strlen(((char **)SDDS_dataset->array[i].data)[j]);
2377 else
2378 offset += sizeof(int32_t);
2379 }
2380 } else {
2381 offset += SDDS_type_size[layout->array_definition[i].type - 1] * SDDS_dataset->array[i].elements;
2382 }
2383 }
2384 return offset;
2385}
2386
2387/*use a small size of buffer to read the total_rows, parameters and arrays of each page */
2388/**
2389 * @brief Buffers and reads the binary title data from an SDDS dataset using MPI.
2390 *
2391 * This function initializes and manages a buffered read for the title section of a native binary SDDS dataset.
2392 * It reads the total number of rows, parameters, and arrays from the binary file buffer. The function handles
2393 * memory allocation for the buffer and initializes the SDDS page structure.
2394 *
2395 * @param[in,out] SDDS_dataset Pointer to the `SDDS_DATASET` structure where the title data will be stored.
2396 *
2397 * @return int32_t Returns `1` on successful read, `0` on failure, and `-1` if the end of the file is reached.
2398 */
2400 SDDS_FILEBUFFER *fBuffer = NULL;
2401 MPI_DATASET *MPI_dataset = NULL;
2402 int32_t ret_val;
2403 int32_t total_rows;
2404
2405 MPI_dataset = SDDS_dataset->MPI_dataset;
2406 fBuffer = &(SDDS_dataset->titleBuffer);
2407 if (!fBuffer->buffer) {
2408 fBuffer->bufferSize = defaultTitleBufferSize;
2409 if (!(fBuffer->buffer = fBuffer->data = SDDS_Malloc(sizeof(char) * (fBuffer->bufferSize + 1)))) {
2410 SDDS_SetError("Unable to do buffered read--allocation failure(SDDS_MPI_ReadBinaryTitle)");
2411 return 0;
2412 }
2413 fBuffer->bytesLeft = 0;
2414 }
2415 if (fBuffer->bytesLeft > 0) {
2416 /* discard the extra data for reading next page */
2417 fBuffer->data[0] = 0;
2418 fBuffer->bytesLeft = 0;
2419 }
2420 if ((ret_val = SDDS_MPI_BufferedRead((void *)&(total_rows), sizeof(int32_t), SDDS_dataset, fBuffer)) < 0)
2421 return -1;
2422 if (total_rows == INT32_MIN) {
2423 if ((ret_val = SDDS_MPI_BufferedRead((void *)&(MPI_dataset->total_rows), sizeof(int64_t), SDDS_dataset, fBuffer)) < 0)
2424 return -1;
2425 } else {
2426 MPI_dataset->total_rows = total_rows;
2427 }
2428 if (!ret_val)
2429 return 0;
2430 if (!SDDS_StartPage(SDDS_dataset, 0)) {
2431 SDDS_SetError("Unable to read page--couldn't start page (SDDS_MPI_BufferedReadBinaryTitle)");
2432 return (0);
2433 }
2434 /*read parameters */
2435 if (!SDDS_MPI_ReadBinaryParameters(SDDS_dataset, fBuffer)) {
2436 SDDS_SetError("Unable to read page--parameter reading error (SDDS_MPI_BufferedReadTitle)");
2437 return (0);
2438 }
2439 /*read arrays */
2440 if (!SDDS_MPI_ReadBinaryArrays(SDDS_dataset, fBuffer)) {
2441 SDDS_SetError("Unable to read page--array reading error (SDDS_MPI_BufferedReadTitle)");
2442 return (0);
2443 }
2444
2445 return 1;
2446}
2447
2448/**
2449 * @brief Writes SDDS dataset rows collectively by row using MPI parallel I/O.
2450 *
2451 * This function performs a collective write operation where each MPI process writes its assigned rows of the SDDS dataset.
2452 * It handles buffering of row data, ensures synchronization among MPI processes, and manages error handling.
2453 * The function supports only binary string types for non-string data and flushes the buffer upon completion.
2454 *
2455 * @param[in,out] SDDS_dataset Pointer to the `SDDS_DATASET` structure containing the data to be written.
2456 *
2457 * @return int32_t Returns `1` on successful write, `0` on failure.
2458 */
2460 MPI_DATASET *MPI_dataset;
2461 SDDS_LAYOUT *layout;
2462 int32_t mpi_code, type, size;
2463 int64_t i, j, n_rows, min_rows, writeBytes;
2464 SDDS_FILEBUFFER *fBuffer;
2465
2466#if MPI_DEBUG
2467 logDebug("SDDS_MPI_CollectiveWriteByRow", SDDS_dataset);
2468#endif
2469
2470 MPI_dataset = SDDS_dataset->MPI_dataset;
2471 layout = &(SDDS_dataset->layout);
2472 fBuffer = &(SDDS_dataset->fBuffer);
2473 n_rows = SDDS_dataset->n_rows;
2474 MPI_Allreduce(&n_rows, &min_rows, 1, MPI_INT64_T, MPI_MIN, MPI_dataset->comm);
2475 for (i = 0; i < min_rows; i++) {
2476 for (j = 0; j < layout->n_columns; j++) {
2477 type = layout->column_definition[j].type;
2478 size = SDDS_type_size[type - 1];
2479 if (type == SDDS_STRING) {
2480 SDDS_SetError("Can not write binary string in collective io.");
2481 return 0;
2482 }
2483 if (!SDDS_MPI_BufferedWriteAll((char *)SDDS_dataset->data[j] + i * size, size, SDDS_dataset))
2484 return 0;
2485 }
2486 }
2487
2488 if ((writeBytes = fBuffer->bufferSize - fBuffer->bytesLeft)) {
2489 if (writeBytes < 0) {
2490 SDDS_SetError("Unable to flush buffer: negative byte count (SDDS_FlushBuffer).");
2491 return 0;
2492 }
2493 if ((mpi_code = MPI_File_write_all(MPI_dataset->MPI_file, fBuffer->buffer, writeBytes, MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
2494 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_FlushBuffer(MPI_File_write_at failed)", mpi_code, 0);
2495 return 0;
2496 }
2497 fBuffer->bytesLeft = fBuffer->bufferSize;
2498 fBuffer->data = fBuffer->buffer;
2499 }
2500
2501 for (i = min_rows; i < n_rows; i++)
2502 if (!SDDS_MPI_WriteBinaryRow(SDDS_dataset, i))
2503 return 0;
2504 if (!SDDS_MPI_FlushBuffer(SDDS_dataset))
2505 return 0;
2506 return 1;
2507}
2508
2509/**
2510 * @brief Writes non-native binary SDDS dataset rows collectively by row using MPI parallel I/O.
2511 *
2512 * This function performs a collective write operation for non-native binary SDDS datasets, where each MPI process writes
2513 * its assigned rows. It handles buffering of row data, ensures synchronization among MPI processes, and manages error handling.
2514 * The function supports only binary string types for non-string data and flushes the buffer upon completion.
2515 *
2516 * @param[in,out] SDDS_dataset Pointer to the `SDDS_DATASET` structure containing the data to be written.
2517 *
2518 * @return int32_t Returns `1` on successful write, `0` on failure.
2519 */
2521 MPI_DATASET *MPI_dataset;
2522 SDDS_LAYOUT *layout;
2523 int32_t mpi_code, type, size;
2524 int64_t i, j, n_rows, min_rows, writeBytes;
2525 SDDS_FILEBUFFER *fBuffer;
2526
2527#if MPI_DEBUG
2528 logDebug("SDDS_MPI_CollectiveWriteNonNativeByRow", SDDS_dataset);
2529#endif
2530
2531 MPI_dataset = SDDS_dataset->MPI_dataset;
2532 layout = &(SDDS_dataset->layout);
2533 fBuffer = &(SDDS_dataset->fBuffer);
2534 n_rows = SDDS_dataset->n_rows;
2535 MPI_Allreduce(&n_rows, &min_rows, 1, MPI_INT64_T, MPI_MIN, MPI_dataset->comm);
2536 for (i = 0; i < min_rows; i++) {
2537 for (j = 0; j < layout->n_columns; j++) {
2538 type = layout->column_definition[j].type;
2539 size = SDDS_type_size[type - 1];
2540 if (type == SDDS_STRING) {
2541 SDDS_SetError("Can not write binary string in collective io.");
2542 return 0;
2543 }
2544 if (!SDDS_MPI_BufferedWriteAll((char *)SDDS_dataset->data[j] + i * size, size, SDDS_dataset))
2545 return 0;
2546 }
2547 }
2548
2549 if ((writeBytes = fBuffer->bufferSize - fBuffer->bytesLeft)) {
2550 if (writeBytes < 0) {
2551 SDDS_SetError("Unable to flush buffer: negative byte count (SDDS_FlushBuffer).");
2552 return 0;
2553 }
2554 if ((mpi_code = MPI_File_write_all(MPI_dataset->MPI_file, fBuffer->buffer, writeBytes, MPI_BYTE, MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
2555 SDDS_MPI_GOTO_ERROR(stderr, "SDDS_MPI_FlushBuffer(MPI_File_write_at failed)", mpi_code, 0);
2556 return 0;
2557 }
2558 fBuffer->bytesLeft = fBuffer->bufferSize;
2559 fBuffer->data = fBuffer->buffer;
2560 }
2561
2562 for (i = min_rows; i < n_rows; i++)
2563 if (!SDDS_MPI_WriteNonNativeBinaryRow(SDDS_dataset, i))
2564 return 0;
2565 if (!SDDS_MPI_FlushBuffer(SDDS_dataset))
2566 return 0;
2567 return 1;
2568}
2569
2570/**
2571 * @brief Reads SDDS dataset rows collectively by row using MPI parallel I/O.
2572 *
2573 * This function performs a collective read operation where each MPI process reads its assigned rows of the SDDS dataset.
2574 * It handles buffering of row data, ensures synchronization among MPI processes, and manages error handling.
2575 * The function supports only binary string types for non-string data and flushes the buffer upon completion.
2576 *
2577 * @param[in,out] SDDS_dataset Pointer to the `SDDS_DATASET` structure where the data will be stored.
2578 *
2579 * @return int32_t Returns `1` on successful read, `0` on failure.
2580 */
2582
2583 SDDS_FILEBUFFER *fBuffer;
2584 MPI_DATASET *MPI_dataset;
2585 int32_t type, size;
2586 int64_t min_rows, i, j;
2587 SDDS_LAYOUT *layout;
2588
2589 MPI_dataset = SDDS_dataset->MPI_dataset;
2590 fBuffer = &(SDDS_dataset->fBuffer);
2591 layout = &(SDDS_dataset->layout);
2592
2593 if (!MPI_dataset->master_read) {
2594 SDDS_SetError("Cannot read row with collective io when master is not reading the data.");
2595 return 0;
2596 }
2597 min_rows = MPI_dataset->total_rows / MPI_dataset->n_processors;
2598 for (i = 0; i < min_rows; i++) {
2599 for (j = 0; j < layout->n_columns; j++) {
2600 type = layout->column_definition[j].type;
2601 size = SDDS_type_size[type - 1];
2602 if (type == SDDS_STRING) {
2603 SDDS_SetError("Can not write binary string in collective io.");
2604 return 0;
2605 }
2606 if (!SDDS_MPI_BufferedReadAll((char *)SDDS_dataset->data[j] + i * size, size, SDDS_dataset, fBuffer))
2607 return 0;
2608 }
2609 }
2610 for (i = min_rows; i < MPI_dataset->n_rows; i++)
2611 if (!SDDS_MPI_ReadBinaryRow(SDDS_dataset, i, 0))
2612 return 0;
2613
2614 return 1;
2615}
SDDS (Self Describing Data Set) Data Types Definitions and Function Prototypes.
int32_t SDDS_MPI_WriteBinaryParameters(SDDS_DATASET *SDDS_dataset)
Write binary parameters of an SDDS dataset using MPI.
int32_t SDDS_MPI_ReadNonNativeBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row, int32_t skip)
Reads a single non-native binary row from a binary file buffer into the SDDS dataset using MPI.
int32_t SDDS_MPI_BufferedReadNonNativeBinaryTitle(SDDS_DATASET *SDDS_dataset)
Buffers and reads the non-native binary title data from an SDDS dataset using MPI.
int32_t SDDS_MPI_WriteBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row)
Write a binary row to an SDDS dataset using MPI.
int32_t SDDS_MPI_CollectiveWriteNonNativeByRow(SDDS_DATASET *SDDS_dataset)
Writes non-native binary SDDS dataset rows collectively by row using MPI parallel I/O.
char * SDDS_MPI_ReadBinaryString(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer, int32_t skip)
Read a binary string from the SDDS dataset using MPI.
int32_t SDDS_MPI_FlushBuffer(SDDS_DATASET *SDDS_dataset)
Flush the buffer by writing any remaining data to the MPI file.
int32_t SDDS_MPI_WriteBinaryString(SDDS_DATASET *SDDS_dataset, char *string)
Write a binary string to an SDDS dataset using MPI.
int32_t SDDS_MPI_WriteContinuousBinaryPage(SDDS_DATASET *SDDS_dataset)
Write a continuous binary page of the SDDS dataset using MPI.
int32_t SDDS_MPI_BufferedReadBinaryTitle(SDDS_DATASET *SDDS_dataset)
Buffers and reads the binary title data from an SDDS dataset using MPI.
void SDDS_StringTuncated(void)
Increment the truncated strings counter.
void SDDS_MPI_SetFileSync(short value)
Set the file synchronization flag.
int32_t SDDS_SetDefaultStringLength(int32_t newValue)
Set the default string length for SDDS.
int32_t SDDS_MPI_ReadNonNativePage(SDDS_DATASET *SDDS_dataset)
Reads a non-native binary page from an SDDS dataset using MPI parallel I/O.
int32_t SDDS_MPI_ReadBinaryParameters(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer)
Read binary parameters from the SDDS dataset using MPI.
int32_t SDDS_MPI_WriteNonNativeBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row)
Write a non-native binary row to an SDDS dataset using MPI.
int32_t SDDS_MPI_BufferedRead(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer)
Buffered read from an SDDS dataset using MPI.
MPI_Offset SDDS_MPI_GetTitleOffset(SDDS_DATASET *SDDS_dataset)
Calculates the byte offset for the title section in a non-native binary SDDS dataset.
int32_t SDDS_MPI_CollectiveReadByRow(SDDS_DATASET *SDDS_dataset)
Reads SDDS dataset rows collectively by row using MPI parallel I/O.
int64_t SDDS_MPI_CountRowsOfInterest(SDDS_DATASET *SDDS_dataset, int64_t start_row, int64_t end_row)
Count the number of rows marked as "of interest" within a specified range.
int32_t SDDS_SetDefaultWriteBufferSize(int32_t newSize)
Set the default write buffer size for SDDS.
MPI_Offset SDDS_MPI_Get_Column_Size(SDDS_DATASET *SDDS_dataset)
Get the total size of all columns in an SDDS dataset.
int32_t SDDS_CheckStringTruncated(void)
Check the number of truncated strings.
int64_t SDDS_MPI_GetTotalRows(SDDS_DATASET *SDDS_dataset)
Get the total number of rows across all MPI processes.
int32_t SDDS_MPI_WriteBinaryPage(SDDS_DATASET *SDDS_dataset)
Write an SDDS binary page using MPI.
int32_t SDDS_MPI_WriteNonNativeBinaryString(SDDS_DATASET *SDDS_dataset, char *string)
Write a non-native binary string to an SDDS dataset using MPI.
int32_t SDDS_MPI_ReadNonNativeBinaryParameters(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer)
Read non-native binary parameters from the SDDS dataset using MPI.
int32_t SDDS_MPI_WriteBinaryArrays(SDDS_DATASET *SDDS_dataset)
Write binary arrays of an SDDS dataset using MPI.
int32_t SDDS_MPI_WriteNonNativeBinaryArrays(SDDS_DATASET *SDDS_dataset)
Write non-native binary arrays of an SDDS dataset using MPI.
int32_t SDDS_MPI_BufferedWrite(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset)
Buffered write to an SDDS dataset using MPI.
int32_t SDDS_MPI_ReadBinaryArrays(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer)
Read binary arrays from the SDDS dataset using MPI.
int32_t SDDS_MPI_WriteNonNativeBinaryParameters(SDDS_DATASET *SDDS_dataset)
Write non-native binary parameters of an SDDS dataset using MPI.
int32_t SDDS_MPI_ReadBinaryPage(SDDS_DATASET *SDDS_dataset)
Reads a binary page from an SDDS dataset using MPI parallel I/O.
int32_t SDDS_MPI_BufferedWriteAll(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset)
Buffered write all to an SDDS dataset using MPI.
int32_t SDDS_SetDefaultReadBufferSize(int32_t newSize)
Set the default read buffer size for SDDS.
int32_t SDDS_MPI_BroadcastTitleData(SDDS_DATASET *SDDS_dataset)
Broadcasts the title data (parameters and arrays) of the SDDS dataset to all MPI processes.
int32_t SDDS_MPI_ReadBinaryRow(SDDS_DATASET *SDDS_dataset, int64_t row, int32_t skip)
Read a binary row from the SDDS dataset using MPI.
int32_t SDDS_MPI_BufferedReadAll(void *target, int64_t targetSize, SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer)
Buffered read all from an SDDS dataset using MPI.
int32_t SDDS_MPI_ReadNonNativeBinaryArrays(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer)
Reads non-native binary arrays from a binary file buffer into the SDDS dataset using MPI.
int32_t SDDS_MPI_CollectiveWriteByRow(SDDS_DATASET *SDDS_dataset)
Writes SDDS dataset rows collectively by row using MPI parallel I/O.
int32_t SDDS_SetDefaultTitleBufferSize(int32_t newSize)
Set the default title buffer size for SDDS.
void SDDS_MPI_SetWriteKludgeUsleep(long value)
Set the write kludge usleep duration.
int32_t SDDS_MPI_ReadNonNativePageSparse(SDDS_DATASET *SDDS_dataset, uint32_t mode)
Reads a sparse non-native binary page from an SDDS dataset using MPI parallel I/O.
int32_t SDDS_MPI_WriteNonNativeBinaryPage(SDDS_DATASET *SDDS_dataset)
Write a non-native binary page of the SDDS dataset using MPI.
int32_t SDDS_MPI_ReadNonNativeBinaryPage(SDDS_DATASET *SDDS_dataset)
Reads a non-native binary page from an SDDS dataset using MPI parallel I/O.
char * SDDS_MPI_ReadNonNativeBinaryString(SDDS_DATASET *SDDS_dataset, SDDS_FILEBUFFER *fBuffer, int32_t skip)
Read a non-native binary string from the SDDS dataset using MPI.
int32_t SDDS_ScanData(char *string, int32_t type, int32_t field_length, void *data, int64_t index, int32_t is_parameter)
Scans a string and saves the parsed value into a data pointer according to the specified data type.
int32_t SDDS_SwapEndsColumnData(SDDS_DATASET *SDDSin)
Swaps the endianness of the column data in an SDDS dataset.
void SDDS_SwapLong64(int64_t *data)
Swaps the endianness of a 64-bit integer.
void SDDS_SetReadRecoveryMode(SDDS_DATASET *SDDS_dataset, int32_t mode)
Sets the read recovery mode for an SDDS dataset.
int32_t SDDS_SwapEndsArrayData(SDDS_DATASET *SDDSin)
Swaps the endianness of the array data in an SDDS dataset.
int32_t SDDS_SwapEndsParameterData(SDDS_DATASET *SDDSin)
Swaps the endianness of the parameter data in an SDDS dataset.
void SDDS_SwapLong(int32_t *data)
Swaps the endianness of a 32-bit integer.
int32_t SDDS_type_size[SDDS_NUM_TYPES]
Array of sizes for each supported data type.
Definition SDDS_data.c:62
int32_t SDDS_LengthenTable(SDDS_DATASET *SDDS_dataset, int64_t n_additional_rows)
int32_t SDDS_StartPage(SDDS_DATASET *SDDS_dataset, int64_t expected_n_rows)
int64_t SDDS_CountRowsOfInterest(SDDS_DATASET *SDDS_dataset)
Counts the number of rows marked as "of interest" in the current data table.
int64_t SDDS_GetRowLimit()
void SDDS_SetError(char *error_text)
Records an error message in the SDDS error stack.
Definition SDDS_utils.c:379
int32_t SDDS_FreeArrayDefinition(ARRAY_DEFINITION *source)
Frees memory allocated for an array definition.
int32_t SDDS_CheckDataset(SDDS_DATASET *SDDS_dataset, const char *caller)
Validates the SDDS dataset pointer.
Definition SDDS_utils.c:552
void * SDDS_Malloc(size_t size)
Allocates memory of a specified size.
Definition SDDS_utils.c:639
void SDDS_ClearErrors()
Clears all recorded error messages from the SDDS error stack.
Definition SDDS_utils.c:318
ARRAY_DEFINITION * SDDS_CopyArrayDefinition(ARRAY_DEFINITION **target, ARRAY_DEFINITION *source)
Creates a copy of an array definition.
int32_t SDDS_IsBigEndianMachine()
Determines whether the current machine uses big-endian byte ordering.
void * SDDS_Realloc(void *old_ptr, size_t new_size)
Reallocates memory to a new size.
Definition SDDS_utils.c:677
void SDDS_MPI_GOTO_ERROR(FILE *fp, char *str, int32_t mpierr, int32_t exit_code)
Handles MPI errors by printing an error message and optionally exiting.
#define SDDS_STRING
Identifier for the string data type.
Definition SDDStypes.h:85
void usleepSystemIndependent(long usec)
Sleep for a given number of microseconds, system-independently.