Мобильное программирование приложений реального времени в стандарте POSIX

       

Функции асинхронного ввода/вывода


Основными операциями асинхронного ввода/вывода являются чтение и запись данных (см. листинг 7.1).

#include <aio.h> int aio_read (struct aiocb *aiocbp); int aio_write (struct aiocb *aiocbp);

Листинг 7.1. Описание функций асинхронного чтения и записи. (html, txt)

Функция aio_read() инициирует запрос на чтение aiocbp->aio_nbytes байт из файла, ассоциированного с дескриптором aiocbp->aio_fildes, начиная с абсолютной позиции aiocbp->aio_offset, в буфер с адресом aiocbp->aio_buf. Возврат из функции произойдет тогда, когда запрос будет принят к исполнению или поставлен в очередь; при этом нормальный результат равен нулю, а значение -1 (в совокупности с errno) свидетельствует об ошибке (например, исчерпаны ресурсы и запрос не удалось поставить в очередь).

Функция aio_write() осуществляет соответствующие действия для записи данных. Если для файлового дескриптора aiocbp->aio_fildes установлен флаг O_APPEND, запись будет производиться в конец файла.

Функция lio_listio() (см. листинг 7.2) позволяет за один вызов инициировать (поставить в очередь) список запросов на чтение и/или запись данных.

#include <aio.h> int lio_listio ( int mode, struct aiocb *restrict const listio [restrict], int nent, struct sigevent *restrict sigev);

Листинг 7.2. Описание функции lio_listio(). (html, txt)

Элементы массива listio [] специфицируют отдельные запросы. В полях aio_lio_opcode указуемых структур типа aiocb могут располагаться значения LIO_READ (запрос на чтение), LIO_WRITE (запрос на запись) или LIO_NOP (пустой запрос). Остальные элементы указуемых структур интерпретируются в соответствии со смыслом запроса (можно считать, что адреса структур передаются в качестве аргументов функциям aio_read() или aio_write()). Число элементов в массиве listio [] задается аргументом nent.

Значение аргумента mode определяет способ обработки списка запросов – синхронный (LIO_WAIT) или асинхронный (LIO_NOWAIT). В первом случае возврат из вызова lio_listio() произойдет только после завершения всех заказанных операций ввода/вывода; при этом аргумент sigev игнорируется.
Во втором случае возврат из lio_listio() произойдет немедленно, а по завершении всех операций ввода/вывода в соответствии со значением аргумента sigev будет сгенерировано асинхронное уведомление.

Нормальный результат выполнения функции lio_listio() равен нулю, но для двух описанных случаев он имеет разный смысл – успешное завершение всех операций ввода/вывода или только успешная постановка запросов в очередь соответственно. Во втором случае часть операций ввода/вывода может завершиться неудачей, и с каждой из них придется разбираться индивидуально. Подобное разбирательство позволяют осуществить функции aio_return() и aio_error() (см. листинг 7.3).

#include <aio.h>

ssize_t aio_return ( struct aiocb *aiocbp);

int aio_error ( const struct aiocb *aiocbp);

Листинг 7.3. Описание функций aio_return() и aio_error(). (html, txt)



Аргументом этих функций служит адрес управляющего блока, который играет в данном случае роль идентификатора запроса. Функция aio_return() выдает значение, которое вернула бы соответствующая функция обычного ввода/вывода (если операция ввода/вывода в момент опроса не была завершена, возвращаемое значение, разумеется, не определено). К идентификатору данного запроса функцию aio_return() можно применить ровно один раз; последующие вызовы aio_return() и aio_error() могут завершиться неудачей. (Конечно, если в той же указуемой структуре типа aiocb сформировать новый запрос и выполнить его, возможность корректного вызова функций aio_return() и aio_error() появляется снова.)

Результатом вызова aio_error() служит значение, которое установила бы в переменную errno соответствующая функция обычного ввода/вывода (или, если операция еще не завершена, результат равен EINPROGRESS). Если операция асинхронного ввода/вывода завершилась успешно, aio_error() вернет нулевой результат. Функцию aio_error() к идентификатору данного запроса можно применять произвольное число раз, и до завершения его выполнения, и после.

Ожидающие обработки запросы на асинхронный ввод/вывод можно аннулировать, воспользовавшись функцией aio_cancel() (см.


листинг 7.4).

#include <aio.h> int aio_cancel (int fildes, struct aiocb *aiocbp);

Листинг 7.4. Описание функции aio_cancel(). (html, txt)

Функция aio_cancel() пытается аннулировать один ( если значение аргумента aiocbp отлично от NULL) или все ожидающие обработки запросы, в которых фигурирует файловый дескриптор fildes. После аннулирования генерируется соответствующее асинхронное уведомление, статус ошибки устанавливается равным ECANCELED, а в качестве возвращаемого значения выдается -1.

Если запрос не удается аннулировать (возможные причины этого, вообще говоря, зависят от реализации), он выполняется стандартным образом.

Результат вызова aio_cancel() равен AIO_CANCELED, если все указанные запросы аннулированы. Если хотя бы один из запросов уже выполняется, он не может быть аннулирован, и тогда в качестве результата возвращается значение AIO_NOTCANCELED (а со статусом запросов, как и для функции lio_listio(), нужно разбираться индивидуально, пользуясь функцией aio_error()). Результат AIO_ALLDONE свидетельствует о том, что выполнение всех указанных запросов не только начато, но и уже завершено. Во всех остальных случаях результат вызова aio_cancel() равен -1.



Какой именно, как обычно, нужно выяснять индивидуально, пользуясь функциями aio_error() и aio_return().

Описываемая далее группа функций (см. листинг 7.6) предназначена для согласования состояния буферизованных и хранимых в долговременной памяти данных, обеспечения целостности данных и файлов. Подобные функции необходимы в системах обработки транзакций, чтобы гарантировать определенное состояние долговременной памяти, даже если система аварийно завершит работу. Один элемент из этой группы – функцию msync() – мы уже рассматривали.

#include <unistd.h> void sync (void);

#include <unistd.h> int fsync (int fildes);

#include <unistd.h> int fdatasync (int fildes);

#include <aio.h> int aio_fsync (int op, struct aiocb *aiocbp);

Листинг 7.6. Описание функций синхронизации оперативной и долговременной памяти.

Функция sync() имеет глобальный характер – она планирует запись в файловые системы всех измененных данных.

Функция fsync() обеспечивает запись измененных данных в файл, заданный дескриптором fildes. Более того, после успешного возврата из функции (с нулевым результатом) окажутся выполненными все стоявшие в очереди к этому файлу запросы на ввод/вывод с гарантией целостности файла.

Функция fdatasync() аналогична fsync(), но гарантируется лишь целостность данных (см. курс [1]).

Функция aio_fsync() осуществляет "асинхронную синхронизацию" файла. Иными словами, порождается запрос, выполнение которого при значении аргумента op, равном O_DSYNC, эквивалентно вызову fdatasync (aiocbp->aio_fildes), а при op, равном O_SYNC, – fsync (aiocbp->aio_fildes). Как и для других операций асинхронного ввода/вывода, адрес управляющего блока может использоваться в качестве аргумента функций aio_error() и aio_return(), а после завершения генерируется асинхронное уведомление в соответствии со значением элемента aiocbp->aio_sigevent. Все остальные поля указуемой структуры типа aiocb игнорируются.

Проиллюстрируем использование функций асинхронного ввода/вывода программой, подсчитывающей суммарное число строк в совокупности файлов (см.


листинг 7.7).

/* * * * * * * * * * * * * * * * * * */ /* Программа подсчитывает суммарное */ /* число строк – в файлах аргументах */ /* командной строки. */ /* Если аргументы отсутствуют или в */ /* качестве имени задан минус, */ /* читается стандартный ввод. */ /* * * * * * * * * * * * * * * * * * */

#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <fcntl.h> #include <signal.h> #include <string.h> #include <aio.h> #include <assert.h> #include <errno.h>

int main (int argc, char *argv []) { /* Указатель на массив */ /* указателей на управляющие */ /* блоки запросов операций */ /* асинхронного чтения */ struct aiocb **lio_ptr; // Общее число строк в файлах long nlines = 0; // Признак повторного указания // стандартного ввода int dup_stdin = 0; int i;

/* Сведем случай с отсутствием */ /* аргументов к общему, */ /* воспользовавшись одним из */ /* немногих стандартизованных */ /* файлов */ if (argc == 1) { argv [0] = "-"; } else { argv [0] = "/dev/null"; }

/* Зарезервируем память, */ /* откроем заданные файлы */ /* и сформируем начальный список */ /* запросов на чтение */ assert ((lio_ptr = (struct aiocb **) malloc (sizeof (struct aiocb *) * argc)) != NULL);

for (i = 0; i < argc; i++) { assert ((lio_ptr [i] = (struct aiocb *) malloc (sizeof (struct aiocb))) != NULL);

if (strcmp (argv [i], "-") == 0) { if (dup_stdin == 0) { lio_ptr [i]->aio_fildes = STDIN_FILENO; dup_stdin = 1; } else { lio_ptr [i]->aio_fildes = fcntl (STDIN_FILENO, F_DUPFD, 0); } } else if ((lio_ptr [i]->aio_fildes = open (argv [i], O_RDONLY)) == -1) { perror ("OPEN"); free (lio_ptr [i]); lio_ptr [i] = NULL; continue; }

lio_ptr [i]->aio_offset = 0; assert ((lio_ptr [i]->aio_buf = malloc(BUFSIZ)) != NULL); lio_ptr [i]->aio_nbytes = BUFSIZ; lio_ptr [i]->aio_reqprio = 0; lio_ptr [i]->aio_sigevent.sigev_notify = SIGEV_NONE; lio_ptr [i]->aio_lio_opcode = LIO_READ; } /* for (i < argc) */



/* Поехали ... */ assert (lio_listio (LIO_NOWAIT, lio_ptr, argc, &lio_ptr [0]->aio_sigevent) == 0);

/* Будем ждать завершения */ /* операций ввода/вывода, */ /* обрабатывать прочитанные */ /* данные и */ /* инициировать новые запросы */ /* на чтение */ while (aio_suspend (( const struct aiocb **) lio_ptr, argc, NULL) == 0) { /* Выясним, какой запрос */ /* и как выполнен */

/* Число недочитанных файлов */ int nreqs = 0;

for (i = 0; i < argc; i++) { if (lio_ptr [i] == NULL) { continue; }

/* Есть обслуживаемые */ /* запросы */ nreqs++;

if (aio_error (lio_ptr [i]) == EINPROGRESS) { continue; } { // Запрос выполнен

// Число прочитанных байт ssize_t nb;

if ((nb = aio_return (lio_ptr [i])) <= 0) { /* Дошли до конца файла*/ /* или чтение */ /* завершилось ошибкой */ (void) close(lio_ptr [i]->aio_fildes); free ((void *) lio_ptr [i]->aio_buf); free (lio_ptr [i]); lio_ptr [i] = NULL; nreqs--; } else { /* Обработаем прочитанные */ /* данные */

/* Текущее начало поиска */ /* перевода строки */ char *p; /* Позиция, где нашли */ /* перевод строки */ char *p1; p = (char *) lio_ptr [i]->aio_buf; while ((p1 = (char *) memchr (p, '\n', nb - (p – (char *) lio_ptr [i]->aio_buf))) != NULL) { nlines++; p = p1 + 1; }

/* Инициируем новый */ /* запрос на чтение */ lio_ptr [i]->aio_offset += nb; (void) aio_read (lio_ptr [i]); } } } /* for (i < argc) */

/* Остались недочитанные */ /* файлы? */ if (nreqs == 0) { break; } } /* while */

printf ("%ld\n", nlines);

return 0; }

Листинг 7.7. Пример программы, использующей функции асинхронного ввода/вывода.

Отметим списочную форму начального представления и ожидания выполнения запросов на асинхронное чтение. Из технических деталей обратим внимание на пустой указатель в качестве элемента массива указателей на структуры типа aiocb (функция lio_listio() игнорирует такие элементы) и в качестве аргумента timeout функции aio_suspend() (означает бесконечное ожидание).

Второй вариант решения той же задачи (см.


листинг 7.8) демонстрирует другой способ уведомления о завершении операции асинхронного ввода/вывода – вызов функции.

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа подсчитывает суммарное число строк */ /* в файлах – аргументах командной строки. */ /* Если аргументы отсутствуют или в качестве имени */ /* задан минус, читается стандартный ввод. */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <fcntl.h> #include <signal.h> #include <string.h> #include <aio.h> #include <pthread.h> #include <assert.h>

/* Указатель на массив указателей на управляющие */ /* блоки запросов операций асинхронного чтения */ static struct aiocb **lio_ptr;

/* Общее число строк в файлах */ static long nlines_total = 0;

/* Переменная условия, на которой ждут */ /* окончания обработки всех файлов */ static pthread_cond_t open_files_cond = PTHREAD_COND_INITIALIZER;

/* Мьютекс, охраняющий доступ */ /* к переменной условия и nlines_total */ static pthread_mutex_t nlines_mutex = PTHREAD_MUTEX_INITIALIZER;

/* * * * * * * * * * * * * * */ /* Функция, вызываемая */ /* при завершении операции */ /* асинхронного ввода/вывода */ /* * * * * * * * * * * * * * */ static void end_of_aio_op (union sigval sg_vl) { int no; /* Номер выполненного запроса */ /* в общем списке */ ssize_t nb; /* Число прочитанных байт */ long nlines = 0; /* Число строк в одной */ /* прочитанной порции */ no = sg_vl.sival_int;

if ((nb = aio_return (lio_ptr [no])) <= 0) { /* Дошли до конца файла */ /* или чтение завершилось ошибкой */ (void) close (lio_ptr [no]->aio_fildes); free ((void *) lio_ptr [no]->aio_buf); free (lio_ptr [no]); lio_ptr [no] = NULL; (void) pthread_cond_signal (&open_files_cond); } else { /* Обработаем прочитанные данные */ char *p; /* Текущее начало поиска перевода строки */ char *p1; /* Позиция, где нашли перевод строки */

p = (char *) lio_ptr [no]->aio_buf; while ((p1 = (char *) memchr (p, '\n', nb - (p – (char *) lio_ptr [no]->aio_buf))) != NULL) { nlines++; p = p1 + 1; }



/* Прибавим локальную сумму к общей */ (void) pthread_mutex_lock (&nlines_mutex); nlines_total += nlines; (void) pthread_mutex_unlock (&nlines_mutex);

/* Инициируем новый запрос на чтение */ lio_ptr [no]->aio_offset += nb; (void) aio_read (lio_ptr [no]); } }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция проверяет, сколько осталось открытых файлов */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static int n_open_files (int nfiles) { int nof = 0; /* Число открытых файлов */ int i;

for (i = 0; i < nfiles; i++) { if (lio_ptr [i] != NULL) { nof++; } }

return (nof); }

/* * * * * * * * * * * * * * * * * * * * */ /* Обработка аргументов командной строки, */ /* инициация начальных запросов на чтение, */ /* ожидание завершения обработки файлов, */ /* вывод результатов */ /* * * * * * * * * * * * * * * * * * * * */ int main (int argc, char *argv []) { int dup_stdin = 0; /* Признак повторного указания */ /* стандартного ввода */ int i;

/* Сведем случай с отсутствием аргументов к общему, */ /* воспользовавшись одним из немногих */ /* стандартизованных файлов */ if (argc == 1) { argv [0] = "-"; } else { argv [0] = "/dev/null"; }

/* Зарезервируем память, откроем заданные файлы */ /* и инициируем начальные запросы на чтение */ assert ((lio_ptr = (struct aiocb **) malloc (sizeof (struct aiocb *) * argc)) != NULL);

for (i = 0; i < argc; i++) { assert ((lio_ptr [i] = (struct aiocb *) malloc (sizeof (struct aiocb))) != NULL);

if (strcmp (argv [i], "-") == 0) { if (dup_stdin == 0) { lio_ptr [i]->aio_fildes = STDIN_FILENO; dup_stdin = 1; } else { lio_ptr [i]->aio_fildes = fcntl (STDIN_FILENO, F_DUPFD, 0); } } else if ((lio_ptr [i]->aio_fildes = open (argv [i], O_RDONLY)) == -1) { perror ("OPEN"); free (lio_ptr [i]); lio_ptr [i] = NULL; continue; }

lio_ptr [i]->aio_offset = 0; assert ((lio_ptr [i]->aio_buf = malloc (BUFSIZ)) != NULL); lio_ptr [i]->aio_nbytes = BUFSIZ; lio_ptr [i]->aio_reqprio = 0;



lio_ptr [i]->aio_sigevent.sigev_notify = SIGEV_THREAD; lio_ptr [i]->aio_sigevent.sigev_signo = SIGRTMIN; lio_ptr [i]->aio_sigevent.sigev_value.sival_int = i; lio_ptr [i]->aio_sigevent.sigev_notify_function = end_of_aio_op; lio_ptr [i]->aio_sigevent.sigev_notify_attributes = NULL;

/* Запрос готов, можно отправлять его на выполнение */ (void) aio_read (lio_ptr [i]); } /* for (i < argc) */

/* Дождемся завершения обработки всех указанных */ /* в командной строке файлов */ while (n_open_files (argc) > 0) { (void) pthread_cond_wait (&open_files_cond, &nlines_mutex); }

printf ("%ld\n", nlines_total);

return 0; }

Листинг 7.8. Модифицированный вариант программы, использующей функции асинхронного ввода/вывода.

В модифицированном варианте можно отметить применение переменных условия как средства ожидания завершения "вдвойне асинхронной" обработки файлов (по вводу/выводу и по выполнению функций процессирования прочитанных данных в рамках специально создаваемых потоков управления). Когда очередной файл закрывается, то для генерации уведомления о возможном завершении всей обработки вызывается функция pthread_cond_signal(). Обратим внимание на то, что в этот момент вызывающий поток не является владельцем мьютекса, ассоциированного с переменной условия, но в данном случае это и не требуется.


Содержание раздела