Projects
Wiki     Timeline     Browser     Search     New Ticket     Bug Reports

source: branches/Lion/src/io.c @ 215

Revision 202, 63.6 KB checked in by dsteffen@…, 3 years ago (diff)

import Lion libdispatch-187.5 source drop

Line 
1/*
2 * Copyright (c) 2009-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include "internal.h"
22
23typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
24
25DISPATCH_EXPORT DISPATCH_NOTHROW
26void _dispatch_iocntl(uint32_t param, uint64_t value);
27
28static void _dispatch_io_dispose(dispatch_io_t channel);
29static dispatch_operation_t _dispatch_operation_create(
30                dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
31                size_t length, dispatch_data_t data, dispatch_queue_t queue,
32                dispatch_io_handler_t handler);
33static void _dispatch_operation_dispose(dispatch_operation_t operation);
34static void _dispatch_operation_enqueue(dispatch_operation_t op,
35                dispatch_op_direction_t direction, dispatch_data_t data);
36static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
37                dispatch_operation_t op);
38static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
39static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
40static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
41                dispatch_fd_entry_init_callback_t completion_callback);
42static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
43                uintptr_t hash);
44static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
45                dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
46static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
47                dispatch_io_t channel);
48static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
49                dispatch_io_t channel);
50static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
51                dispatch_queue_t tq);
52static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
53                dispatch_op_direction_t direction);
54static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
55static void _dispatch_disk_dispose(dispatch_disk_t disk);
56static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
57                dispatch_operation_t operation, dispatch_data_t data);
58static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
59                dispatch_operation_t operation, dispatch_data_t data);
60static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
61                dispatch_io_t channel);
62static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
63                dispatch_io_t channel);
64static void _dispatch_stream_source_handler(void *ctx);
65static void _dispatch_stream_handler(void *ctx);
66static void _dispatch_disk_handler(void *ctx);
67static void _dispatch_disk_perform(void *ctxt);
68static void _dispatch_operation_advise(dispatch_operation_t op,
69                size_t chunk_size);
70static int _dispatch_operation_perform(dispatch_operation_t op);
71static void _dispatch_operation_deliver_data(dispatch_operation_t op,
72                dispatch_op_flags_t flags);
73
74// Macros to wrap syscalls which return -1 on error, and retry on EINTR
75#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
76                switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
77                case EINTR: continue; \
78                __VA_ARGS__ \
79                } \
80        } while (0)
81#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
82                _dispatch_io_syscall_switch_noerr(__err, __syscall, \
83                case 0: break; \
84                __VA_ARGS__ \
85                ); \
86        } while (0)
87#define _dispatch_io_syscall(__syscall) do { int __err; \
88                _dispatch_io_syscall_switch(__err, __syscall); \
89        } while (0)
90
91enum {
92        DISPATCH_OP_COMPLETE = 1,
93        DISPATCH_OP_DELIVER,
94        DISPATCH_OP_DELIVER_AND_COMPLETE,
95        DISPATCH_OP_COMPLETE_RESUME,
96        DISPATCH_OP_RESUME,
97        DISPATCH_OP_ERR,
98        DISPATCH_OP_FD_ERR,
99};
100
101#pragma mark -
102#pragma mark dispatch_io_vtable
103
104static const struct dispatch_io_vtable_s _dispatch_io_vtable = {
105        .do_type = DISPATCH_IO_TYPE,
106        .do_kind = "channel",
107        .do_dispose = _dispatch_io_dispose,
108        .do_invoke = NULL,
109        .do_probe = (void *)dummy_function_r0,
110        .do_debug = (void *)dummy_function_r0,
111};
112
113static const struct dispatch_operation_vtable_s _dispatch_operation_vtable = {
114        .do_type = DISPATCH_OPERATION_TYPE,
115        .do_kind = "operation",
116        .do_dispose = _dispatch_operation_dispose,
117        .do_invoke = NULL,
118        .do_probe = (void *)dummy_function_r0,
119        .do_debug = (void *)dummy_function_r0,
120};
121
122static const struct dispatch_disk_vtable_s _dispatch_disk_vtable = {
123        .do_type = DISPATCH_DISK_TYPE,
124        .do_kind = "disk",
125        .do_dispose = _dispatch_disk_dispose,
126        .do_invoke = NULL,
127        .do_probe = (void *)dummy_function_r0,
128        .do_debug = (void *)dummy_function_r0,
129};
130
131#pragma mark -
132#pragma mark dispatch_io_hashtables
133
134#if TARGET_OS_EMBEDDED
135#define DIO_HASH_SIZE  64u // must be a power of two
136#else
137#define DIO_HASH_SIZE 256u // must be a power of two
138#endif
139#define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1)))
140
141// Global hashtable of dev_t -> disk_s mappings
142DISPATCH_CACHELINE_ALIGN
143static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
144// Global hashtable of fd -> fd_entry_s mappings
145DISPATCH_CACHELINE_ALIGN
146static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
147
148static dispatch_once_t  _dispatch_io_devs_lockq_pred;
149static dispatch_queue_t _dispatch_io_devs_lockq;
150static dispatch_queue_t _dispatch_io_fds_lockq;
151
152static void
153_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
154{
155        _dispatch_io_fds_lockq = dispatch_queue_create(
156                        "com.apple.libdispatch-io.fd_lockq", NULL);
157        unsigned int i;
158        for (i = 0; i < DIO_HASH_SIZE; i++) {
159                TAILQ_INIT(&_dispatch_io_fds[i]);
160        }
161}
162
163static void
164_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
165{
166        _dispatch_io_devs_lockq = dispatch_queue_create(
167                        "com.apple.libdispatch-io.dev_lockq", NULL);
168        unsigned int i;
169        for (i = 0; i < DIO_HASH_SIZE; i++) {
170                TAILQ_INIT(&_dispatch_io_devs[i]);
171        }
172}
173
174#pragma mark -
175#pragma mark dispatch_io_defaults
176
177enum {
178        DISPATCH_IOCNTL_CHUNK_PAGES = 1,
179        DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
180        DISPATCH_IOCNTL_INITIAL_DELIVERY,
181        DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
182};
183
184static struct dispatch_io_defaults_s {
185        size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
186        bool initial_delivery;
187} dispatch_io_defaults = {
188        .chunk_pages = DIO_MAX_CHUNK_PAGES,
189        .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
190        .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
191};
192
193#define _dispatch_iocntl_set_default(p, v) do { \
194                dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
195        } while (0)
196
197void
198_dispatch_iocntl(uint32_t param, uint64_t value)
199{
200        switch (param) {
201        case DISPATCH_IOCNTL_CHUNK_PAGES:
202                _dispatch_iocntl_set_default(chunk_pages, value);
203                break;
204        case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
205                _dispatch_iocntl_set_default(low_water_chunks, value);
206                break;
207        case DISPATCH_IOCNTL_INITIAL_DELIVERY:
208                _dispatch_iocntl_set_default(initial_delivery, value);
209        case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
210                _dispatch_iocntl_set_default(max_pending_io_reqs, value);
211                break;
212        }
213}
214
215#pragma mark -
216#pragma mark dispatch_io_t
217
218static dispatch_io_t
219_dispatch_io_create(dispatch_io_type_t type)
220{
221        dispatch_io_t channel = calloc(1ul, sizeof(struct dispatch_io_s));
222        channel->do_vtable = &_dispatch_io_vtable;
223        channel->do_next = DISPATCH_OBJECT_LISTLESS;
224        channel->do_ref_cnt = 1;
225        channel->do_xref_cnt = 1;
226        channel->do_targetq = _dispatch_get_root_queue(0, true);
227        channel->params.type = type;
228        channel->params.high = SIZE_MAX;
229        channel->params.low = dispatch_io_defaults.low_water_chunks *
230                        dispatch_io_defaults.chunk_pages * PAGE_SIZE;
231        channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
232                        NULL);
233        return channel;
234}
235
236static void
237_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
238                dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
239{
240        // Enqueue the cleanup handler on the suspended close queue
241        if (cleanup_handler) {
242                _dispatch_retain(queue);
243                dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
244                        dispatch_async(queue, ^{
245                                _dispatch_io_debug("cleanup handler invoke", -1);
246                                cleanup_handler(err);
247                        });
248                        _dispatch_release(queue);
249                });
250        }
251        if (fd_entry) {
252                channel->fd_entry = fd_entry;
253                dispatch_retain(fd_entry->barrier_queue);
254                dispatch_retain(fd_entry->barrier_group);
255                channel->barrier_queue = fd_entry->barrier_queue;
256                channel->barrier_group = fd_entry->barrier_group;
257        } else {
258                // Still need to create a barrier queue, since all operations go
259                // through it
260                channel->barrier_queue = dispatch_queue_create(
261                                "com.apple.libdispatch-io.barrierq", NULL);
262                channel->barrier_group = dispatch_group_create();
263        }
264}
265
266static void
267_dispatch_io_dispose(dispatch_io_t channel)
268{
269        if (channel->fd_entry) {
270                if (channel->fd_entry->path_data) {
271                        // This modification is safe since path_data->channel is checked
272                        // only on close_queue (which is still suspended at this point)
273                        channel->fd_entry->path_data->channel = NULL;
274                }
275                // Cleanup handlers will only run when all channels related to this
276                // fd are complete
277                _dispatch_fd_entry_release(channel->fd_entry);
278        }
279        if (channel->queue) {
280                dispatch_release(channel->queue);
281        }
282        if (channel->barrier_queue) {
283                dispatch_release(channel->barrier_queue);
284        }
285        if (channel->barrier_group) {
286                dispatch_release(channel->barrier_group);
287        }
288        _dispatch_dispose(channel);
289}
290
291static int
292_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
293{
294        int err = 0;
295        if (S_ISDIR(mode)) {
296                err = EISDIR;
297        } else if (channel->params.type == DISPATCH_IO_RANDOM &&
298                        (S_ISFIFO(mode) || S_ISSOCK(mode))) {
299                err = ESPIPE;
300        }
301        return err;
302}
303
304static int
305_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
306                bool ignore_closed)
307{
308        // On _any_ queue
309        int err;
310        if (op) {
311                channel = op->channel;
312        }
313        if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
314                if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
315                        err = ECANCELED;
316                } else {
317                        err = 0;
318                }
319        } else {
320                err = op ? op->fd_entry->err : channel->err;
321        }
322        return err;
323}
324
325#pragma mark -
326#pragma mark dispatch_io_channels
327
328dispatch_io_t
329dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
330                dispatch_queue_t queue, void (^cleanup_handler)(int))
331{
332        if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
333                return NULL;
334        }
335        _dispatch_io_debug("io create", fd);
336        dispatch_io_t channel = _dispatch_io_create(type);
337        channel->fd = fd;
338        channel->fd_actual = fd;
339        dispatch_suspend(channel->queue);
340        _dispatch_retain(queue);
341        _dispatch_retain(channel);
342        _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
343                // On barrier queue
344                int err = fd_entry->err;
345                if (!err) {
346                        err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
347                }
348                if (!err && type == DISPATCH_IO_RANDOM) {
349                        off_t f_ptr;
350                        _dispatch_io_syscall_switch_noerr(err,
351                                f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
352                                case 0: channel->f_ptr = f_ptr; break;
353                                default: (void)dispatch_assume_zero(err); break;
354                        );
355                }
356                channel->err = err;
357                _dispatch_fd_entry_retain(fd_entry);
358                _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
359                dispatch_resume(channel->queue);
360                _dispatch_release(channel);
361                _dispatch_release(queue);
362        });
363        return channel;
364}
365
366dispatch_io_t
367dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
368                int oflag, mode_t mode, dispatch_queue_t queue,
369                void (^cleanup_handler)(int error))
370{
371        if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
372                        !(path && *path == '/')) {
373                return NULL;
374        }
375        size_t pathlen = strlen(path);
376        dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
377        if (!path_data) {
378                return NULL;
379        }
380        _dispatch_io_debug("io create with path %s", -1, path);
381        dispatch_io_t channel = _dispatch_io_create(type);
382        channel->fd = -1;
383        channel->fd_actual = -1;
384        path_data->channel = channel;
385        path_data->oflag = oflag;
386        path_data->mode = mode;
387        path_data->pathlen = pathlen;
388        memcpy(path_data->path, path, pathlen + 1);
389        _dispatch_retain(queue);
390        _dispatch_retain(channel);
391        dispatch_async(channel->queue, ^{
392                int err = 0;
393                struct stat st;
394                _dispatch_io_syscall_switch_noerr(err,
395                        (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
396                                        (path_data->oflag & O_SYMLINK) == O_SYMLINK ?
397                                        lstat(path_data->path, &st) : stat(path_data->path, &st),
398                        case 0:
399                                err = _dispatch_io_validate_type(channel, st.st_mode);
400                                break;
401                        default:
402                                if ((path_data->oflag & O_CREAT) &&
403                                                (*(path_data->path + path_data->pathlen - 1) != '/')) {
404                                        // Check parent directory
405                                        char *c = strrchr(path_data->path, '/');
406                                        dispatch_assert(c);
407                                        *c = 0;
408                                        int perr;
409                                        _dispatch_io_syscall_switch_noerr(perr,
410                                                stat(path_data->path, &st),
411                                                case 0:
412                                                        // Since the parent directory exists, open() will
413                                                        // create a regular file after the fd_entry has
414                                                        // been filled in
415                                                        st.st_mode = S_IFREG;
416                                                        err = 0;
417                                                        break;
418                                        );
419                                        *c = '/';
420                                }
421                                break;
422                );
423                channel->err = err;
424                if (err) {
425                        free(path_data);
426                        _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
427                        _dispatch_release(channel);
428                        _dispatch_release(queue);
429                        return;
430                }
431                dispatch_suspend(channel->queue);
432                dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
433                                _dispatch_io_devs_lockq_init);
434                dispatch_async(_dispatch_io_devs_lockq, ^{
435                        dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
436                                        path_data, st.st_dev, st.st_mode);
437                        _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
438                        dispatch_resume(channel->queue);
439                        _dispatch_release(channel);
440                        _dispatch_release(queue);
441                });
442        });
443        return channel;
444}
445
446dispatch_io_t
447dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
448                dispatch_queue_t queue, void (^cleanup_handler)(int error))
449{
450        if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
451                return NULL;
452        }
453        _dispatch_io_debug("io create with io %p", -1, in_channel);
454        dispatch_io_t channel = _dispatch_io_create(type);
455        dispatch_suspend(channel->queue);
456        _dispatch_retain(queue);
457        _dispatch_retain(channel);
458        _dispatch_retain(in_channel);
459        dispatch_async(in_channel->queue, ^{
460                int err0 = _dispatch_io_get_error(NULL, in_channel, false);
461                if (err0) {
462                        channel->err = err0;
463                        _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
464                        dispatch_resume(channel->queue);
465                        _dispatch_release(channel);
466                        _dispatch_release(in_channel);
467                        _dispatch_release(queue);
468                        return;
469                }
470                dispatch_async(in_channel->barrier_queue, ^{
471                        int err = _dispatch_io_get_error(NULL, in_channel, false);
472                        // If there is no error, the fd_entry for the in_channel is valid.
473                        // Since we are running on in_channel's queue, the fd_entry has been
474                        // fully resolved and will stay valid for the duration of this block
475                        if (!err) {
476                                err = in_channel->err;
477                                if (!err) {
478                                        err = in_channel->fd_entry->err;
479                                }
480                        }
481                        if (!err) {
482                                err = _dispatch_io_validate_type(channel,
483                                                in_channel->fd_entry->stat.mode);
484                        }
485                        if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
486                                off_t f_ptr;
487                                _dispatch_io_syscall_switch_noerr(err,
488                                        f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
489                                        case 0: channel->f_ptr = f_ptr; break;
490                                        default: (void)dispatch_assume_zero(err); break;
491                                );
492                        }
493                        channel->err = err;
494                        if (err) {
495                                _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
496                                dispatch_resume(channel->queue);
497                                _dispatch_release(channel);
498                                _dispatch_release(in_channel);
499                                _dispatch_release(queue);
500                                return;
501                        }
502                        if (in_channel->fd == -1) {
503                                // in_channel was created from path
504                                channel->fd = -1;
505                                channel->fd_actual = -1;
506                                mode_t mode = in_channel->fd_entry->stat.mode;
507                                dev_t dev = in_channel->fd_entry->stat.dev;
508                                size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
509                                                in_channel->fd_entry->path_data->pathlen + 1;
510                                dispatch_io_path_data_t path_data = malloc(path_data_len);
511                                memcpy(path_data, in_channel->fd_entry->path_data,
512                                                path_data_len);
513                                path_data->channel = channel;
514                                // lockq_io_devs is known to already exist
515                                dispatch_async(_dispatch_io_devs_lockq, ^{
516                                        dispatch_fd_entry_t fd_entry;
517                                        fd_entry = _dispatch_fd_entry_create_with_path(path_data,
518                                                        dev, mode);
519                                        _dispatch_io_init(channel, fd_entry, queue, 0,
520                                                        cleanup_handler);
521                                        dispatch_resume(channel->queue);
522                                        _dispatch_release(channel);
523                                        _dispatch_release(queue);
524                                });
525                        } else {
526                                dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
527                                channel->fd = in_channel->fd;
528                                channel->fd_actual = in_channel->fd_actual;
529                                _dispatch_fd_entry_retain(fd_entry);
530                                _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
531                                dispatch_resume(channel->queue);
532                                _dispatch_release(channel);
533                                _dispatch_release(queue);
534                        }
535                        _dispatch_release(in_channel);
536                });
537        });
538        return channel;
539}
540
541#pragma mark -
542#pragma mark dispatch_io_accessors
543
544void
545dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
546{
547        _dispatch_retain(channel);
548        dispatch_async(channel->queue, ^{
549                _dispatch_io_debug("io set high water", channel->fd);
550                if (channel->params.low > high_water) {
551                        channel->params.low = high_water;
552                }
553                channel->params.high = high_water ? high_water : 1;
554                _dispatch_release(channel);
555        });
556}
557
558void
559dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
560{
561        _dispatch_retain(channel);
562        dispatch_async(channel->queue, ^{
563                _dispatch_io_debug("io set low water", channel->fd);
564                if (channel->params.high < low_water) {
565                        channel->params.high = low_water ? low_water : 1;
566                }
567                channel->params.low = low_water;
568                _dispatch_release(channel);
569        });
570}
571
572void
573dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
574                unsigned long flags)
575{
576        _dispatch_retain(channel);
577        dispatch_async(channel->queue, ^{
578                _dispatch_io_debug("io set interval", channel->fd);
579                channel->params.interval = interval;
580                channel->params.interval_flags = flags;
581                _dispatch_release(channel);
582        });
583}
584
585void
586_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
587{
588        _dispatch_retain(dq);
589        _dispatch_retain(channel);
590        dispatch_async(channel->queue, ^{
591                dispatch_queue_t prev_dq = channel->do_targetq;
592                channel->do_targetq = dq;
593                _dispatch_release(prev_dq);
594                _dispatch_release(channel);
595        });
596}
597
598dispatch_fd_t
599dispatch_io_get_descriptor(dispatch_io_t channel)
600{
601        if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
602                return -1;
603        }
604        dispatch_fd_t fd = channel->fd_actual;
605        if (fd == -1 &&
606                        _dispatch_thread_getspecific(dispatch_io_key) == channel) {
607                dispatch_fd_entry_t fd_entry = channel->fd_entry;
608                (void)_dispatch_fd_entry_open(fd_entry, channel);
609        }
610        return channel->fd_actual;
611}
612
613#pragma mark -
614#pragma mark dispatch_io_operations
615
616static void
617_dispatch_io_stop(dispatch_io_t channel)
618{
619        _dispatch_io_debug("io stop", channel->fd);
620        (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED);
621        _dispatch_retain(channel);
622        dispatch_async(channel->queue, ^{
623                dispatch_async(channel->barrier_queue, ^{
624                        dispatch_fd_entry_t fd_entry = channel->fd_entry;
625                        if (fd_entry) {
626                                _dispatch_io_debug("io stop cleanup", channel->fd);
627                                _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
628                                channel->fd_entry = NULL;
629                                _dispatch_fd_entry_release(fd_entry);
630                        } else if (channel->fd != -1) {
631                                // Stop after close, need to check if fd_entry still exists
632                                _dispatch_retain(channel);
633                                dispatch_async(_dispatch_io_fds_lockq, ^{
634                                        _dispatch_io_debug("io stop after close cleanup",
635                                                        channel->fd);
636                                        dispatch_fd_entry_t fdi;
637                                        uintptr_t hash = DIO_HASH(channel->fd);
638                                        TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
639                                                if (fdi->fd == channel->fd) {
640                                                        _dispatch_fd_entry_cleanup_operations(fdi, channel);
641                                                        break;
642                                                }
643                                        }
644                                        _dispatch_release(channel);
645                                });
646                        }
647                        _dispatch_release(channel);
648                });
649        });
650}
651
652void
653dispatch_io_close(dispatch_io_t channel, unsigned long flags)
654{
655        if (flags & DISPATCH_IO_STOP) {
656                // Don't stop an already stopped channel
657                if (channel->atomic_flags & DIO_STOPPED) {
658                        return;
659                }
660                return _dispatch_io_stop(channel);
661        }
662        // Don't close an already closed or stopped channel
663        if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
664                return;
665        }
666        _dispatch_retain(channel);
667        dispatch_async(channel->queue, ^{
668                dispatch_async(channel->barrier_queue, ^{
669                        _dispatch_io_debug("io close", channel->fd);
670                        (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED);
671                        dispatch_fd_entry_t fd_entry = channel->fd_entry;
672                        if (fd_entry) {
673                                if (!fd_entry->path_data) {
674                                        channel->fd_entry = NULL;
675                                }
676                                _dispatch_fd_entry_release(fd_entry);
677                        }
678                        _dispatch_release(channel);
679                });
680        });
681}
682
683void
684dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
685{
686        _dispatch_retain(channel);
687        dispatch_async(channel->queue, ^{
688                dispatch_queue_t io_q = channel->do_targetq;
689                dispatch_queue_t barrier_queue = channel->barrier_queue;
690                dispatch_group_t barrier_group = channel->barrier_group;
691                dispatch_async(barrier_queue, ^{
692                        dispatch_suspend(barrier_queue);
693                        dispatch_group_notify(barrier_group, io_q, ^{
694                                _dispatch_thread_setspecific(dispatch_io_key, channel);
695                                barrier();
696                                _dispatch_thread_setspecific(dispatch_io_key, NULL);
697                                dispatch_resume(barrier_queue);
698                                _dispatch_release(channel);
699                        });
700                });
701        });
702}
703
704void
705dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
706                dispatch_queue_t queue, dispatch_io_handler_t handler)
707{
708        _dispatch_retain(channel);
709        _dispatch_retain(queue);
710        dispatch_async(channel->queue, ^{
711                dispatch_operation_t op;
712                op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
713                                length, dispatch_data_empty, queue, handler);
714                if (op) {
715                        dispatch_queue_t barrier_q = channel->barrier_queue;
716                        dispatch_async(barrier_q, ^{
717                                _dispatch_operation_enqueue(op, DOP_DIR_READ,
718                                                dispatch_data_empty);
719                        });
720                }
721                _dispatch_release(channel);
722                _dispatch_release(queue);
723        });
724}
725
726void
727dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
728                dispatch_queue_t queue, dispatch_io_handler_t handler)
729{
730        _dispatch_io_data_retain(data);
731        _dispatch_retain(channel);
732        _dispatch_retain(queue);
733        dispatch_async(channel->queue, ^{
734                dispatch_operation_t op;
735                op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
736                                dispatch_data_get_size(data), data, queue, handler);
737                if (op) {
738                        dispatch_queue_t barrier_q = channel->barrier_queue;
739                        dispatch_async(barrier_q, ^{
740                                _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
741                                _dispatch_io_data_release(data);
742                        });
743                } else {
744                        _dispatch_io_data_release(data);
745                }
746                _dispatch_release(channel);
747                _dispatch_release(queue);
748        });
749}
750
751void
752dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
753                void (^handler)(dispatch_data_t, int))
754{
755        _dispatch_retain(queue);
756        _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
757                // On barrier queue
758                if (fd_entry->err) {
759                        int err = fd_entry->err;
760                        dispatch_async(queue, ^{
761                                _dispatch_io_debug("convenience handler invoke", fd);
762                                handler(dispatch_data_empty, err);
763                        });
764                        _dispatch_release(queue);
765                        return;
766                }
767                // Safe to access fd_entry on barrier queue
768                dispatch_io_t channel = fd_entry->convenience_channel;
769                if (!channel) {
770                        channel = _dispatch_io_create(DISPATCH_IO_STREAM);
771                        channel->fd = fd;
772                        channel->fd_actual = fd;
773                        channel->fd_entry = fd_entry;
774                        dispatch_retain(fd_entry->barrier_queue);
775                        dispatch_retain(fd_entry->barrier_group);
776                        channel->barrier_queue = fd_entry->barrier_queue;
777                        channel->barrier_group = fd_entry->barrier_group;
778                        fd_entry->convenience_channel = channel;
779                }
780                __block dispatch_data_t deliver_data = dispatch_data_empty;
781                __block int err = 0;
782                dispatch_async(fd_entry->close_queue, ^{
783                        dispatch_async(queue, ^{
784                                _dispatch_io_debug("convenience handler invoke", fd);
785                                handler(deliver_data, err);
786                                _dispatch_io_data_release(deliver_data);
787                        });
788                        _dispatch_release(queue);
789                });
790                dispatch_operation_t op =
791                        _dispatch_operation_create(DOP_DIR_READ, channel, 0,
792                                        length, dispatch_data_empty,
793                                        _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
794                                        false), ^(bool done, dispatch_data_t data, int error) {
795                                if (data) {
796                                        data = dispatch_data_create_concat(deliver_data, data);
797                                        _dispatch_io_data_release(deliver_data);
798                                        deliver_data = data;
799                                }
800                                if (done) {
801                                        err = error;
802                                }
803                        });
804                if (op) {
805                        _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
806                }
807        });
808}
809
810void
811dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
812                void (^handler)(dispatch_data_t, int))
813{
814        _dispatch_io_data_retain(data);
815        _dispatch_retain(queue);
816        _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
817                // On barrier queue
818                if (fd_entry->err) {
819                        int err = fd_entry->err;
820                        dispatch_async(queue, ^{
821                                _dispatch_io_debug("convenience handler invoke", fd);
822                                handler(NULL, err);
823                        });
824                        _dispatch_release(queue);
825                        return;
826                }
827                // Safe to access fd_entry on barrier queue
828                dispatch_io_t channel = fd_entry->convenience_channel;
829                if (!channel) {
830                        channel = _dispatch_io_create(DISPATCH_IO_STREAM);
831                        channel->fd = fd;
832                        channel->fd_actual = fd;
833                        channel->fd_entry = fd_entry;
834                        dispatch_retain(fd_entry->barrier_queue);
835                        dispatch_retain(fd_entry->barrier_group);
836                        channel->barrier_queue = fd_entry->barrier_queue;
837                        channel->barrier_group = fd_entry->barrier_group;
838                        fd_entry->convenience_channel = channel;
839                }
840                __block dispatch_data_t deliver_data = NULL;
841                __block int err = 0;
842                dispatch_async(fd_entry->close_queue, ^{
843                        dispatch_async(queue, ^{
844                                _dispatch_io_debug("convenience handler invoke", fd);
845                                handler(deliver_data, err);
846                                if (deliver_data) {
847                                        _dispatch_io_data_release(deliver_data);
848                                }
849                        });
850                        _dispatch_release(queue);
851                });
852                dispatch_operation_t op =
853                        _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
854                                        dispatch_data_get_size(data), data,
855                                        _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
856                                        false), ^(bool done, dispatch_data_t d, int error) {
857                                if (done) {
858                                        if (d) {
859                                                _dispatch_io_data_retain(d);
860                                                deliver_data = d;
861                                        }
862                                        err = error;
863                                }
864                        });
865                if (op) {
866                        _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
867                }
868                _dispatch_io_data_release(data);
869        });
870}
871
872#pragma mark -
873#pragma mark dispatch_operation_t
874
875static dispatch_operation_t
876_dispatch_operation_create(dispatch_op_direction_t direction,
877                dispatch_io_t channel, off_t offset, size_t length,
878                dispatch_data_t data, dispatch_queue_t queue,
879                dispatch_io_handler_t handler)
880{
881        // On channel queue
882        dispatch_assert(direction < DOP_DIR_MAX);
883        _dispatch_io_debug("operation create", channel->fd);
884#if DISPATCH_IO_DEBUG
885        int fd = channel->fd;
886#endif
887        // Safe to call _dispatch_io_get_error() with channel->fd_entry since
888        // that can only be NULL if atomic_flags are set rdar://problem/8362514
889        int err = _dispatch_io_get_error(NULL, channel, false);
890        if (err || !length) {
891                _dispatch_io_data_retain(data);
892                _dispatch_retain(queue);
893                dispatch_async(channel->barrier_queue, ^{
894                        dispatch_async(queue, ^{
895                                dispatch_data_t d = data;
896                                if (direction == DOP_DIR_READ && err) {
897                                        d = NULL;
898                                } else if (direction == DOP_DIR_WRITE && !err) {
899                                        d = NULL;
900                                }
901                                _dispatch_io_debug("IO handler invoke", fd);
902                                handler(true, d, err);
903                                _dispatch_io_data_release(data);
904                        });
905                        _dispatch_release(queue);
906                });
907                return NULL;
908        }
909        dispatch_operation_t op;
910        op = calloc(1ul, sizeof(struct dispatch_operation_s));
911        op->do_vtable = &_dispatch_operation_vtable;
912        op->do_next = DISPATCH_OBJECT_LISTLESS;
913        op->do_ref_cnt = 1;
914        op->do_xref_cnt = 0; // operation object is not exposed externally
915        op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
916        op->op_q->do_targetq = queue;
917        _dispatch_retain(queue);
918        op->active = false;
919        op->direction = direction;
920        op->offset = offset + channel->f_ptr;
921        op->length = length;
922        op->handler = Block_copy(handler);
923        _dispatch_retain(channel);
924        op->channel = channel;
925        op->params = channel->params;
926        // Take a snapshot of the priority of the channel queue. The actual I/O
927        // for this operation will be performed at this priority
928        dispatch_queue_t targetq = op->channel->do_targetq;
929        while (fastpath(targetq->do_targetq)) {
930                targetq = targetq->do_targetq;
931        }
932        op->do_targetq = targetq;
933        return op;
934}
935
936static void
937_dispatch_operation_dispose(dispatch_operation_t op)
938{
939        // Deliver the data if there's any
940        if (op->fd_entry) {
941                _dispatch_operation_deliver_data(op, DOP_DONE);
942                dispatch_group_leave(op->fd_entry->barrier_group);
943                _dispatch_fd_entry_release(op->fd_entry);
944        }
945        if (op->channel) {
946                _dispatch_release(op->channel);
947        }
948        if (op->timer) {
949                dispatch_release(op->timer);
950        }
951        // For write operations, op->buf is owned by op->buf_data
952        if (op->buf && op->direction == DOP_DIR_READ) {
953                free(op->buf);
954        }
955        if (op->buf_data) {
956                _dispatch_io_data_release(op->buf_data);
957        }
958        if (op->data) {
959                _dispatch_io_data_release(op->data);
960        }
961        if (op->op_q) {
962                dispatch_release(op->op_q);
963        }
964        Block_release(op->handler);
965        _dispatch_dispose(op);
966}
967
968static void
969_dispatch_operation_enqueue(dispatch_operation_t op,
970                dispatch_op_direction_t direction, dispatch_data_t data)
971{
972        // Called from the barrier queue
973        _dispatch_io_data_retain(data);
974        // If channel is closed or stopped, then call the handler immediately
975        int err = _dispatch_io_get_error(NULL, op->channel, false);
976        if (err) {
977                dispatch_io_handler_t handler = op->handler;
978                dispatch_async(op->op_q, ^{
979                        dispatch_data_t d = data;
980                        if (direction == DOP_DIR_READ && err) {
981                                d = NULL;
982                        } else if (direction == DOP_DIR_WRITE && !err) {
983                                d = NULL;
984                        }
985                        handler(true, d, err);
986                        _dispatch_io_data_release(data);
987                });
988                _dispatch_release(op);
989                return;
990        }
991        // Finish operation init
992        op->fd_entry = op->channel->fd_entry;
993        _dispatch_fd_entry_retain(op->fd_entry);
994        dispatch_group_enter(op->fd_entry->barrier_group);
995        dispatch_disk_t disk = op->fd_entry->disk;
996        if (!disk) {
997                dispatch_stream_t stream = op->fd_entry->streams[direction];
998                dispatch_async(stream->dq, ^{
999                        _dispatch_stream_enqueue_operation(stream, op, data);
1000                        _dispatch_io_data_release(data);
1001                });
1002        } else {
1003                dispatch_async(disk->pick_queue, ^{
1004                        _dispatch_disk_enqueue_operation(disk, op, data);
1005                        _dispatch_io_data_release(data);
1006                });
1007        }
1008}
1009
1010static bool
1011_dispatch_operation_should_enqueue(dispatch_operation_t op,
1012                dispatch_queue_t tq, dispatch_data_t data)
1013{
1014        // On stream queue or disk queue
1015        _dispatch_io_debug("enqueue operation", op->fd_entry->fd);
1016        _dispatch_io_data_retain(data);
1017        op->data = data;
1018        int err = _dispatch_io_get_error(op, NULL, true);
1019        if (err) {
1020                op->err = err;
1021                // Final release
1022                _dispatch_release(op);
1023                return false;
1024        }
1025        if (op->params.interval) {
1026                dispatch_resume(_dispatch_operation_timer(tq, op));
1027        }
1028        return true;
1029}
1030
1031static dispatch_source_t
1032_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1033{
1034        // On stream queue or pick queue
1035        if (op->timer) {
1036                return op->timer;
1037        }
1038        dispatch_source_t timer = dispatch_source_create(
1039                        DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1040        dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1041                        op->params.interval), op->params.interval, 0);
1042        dispatch_source_set_event_handler(timer, ^{
1043                // On stream queue or pick queue
1044                if (dispatch_source_testcancel(timer)) {
1045                        // Do nothing. The operation has already completed
1046                        return;
1047                }
1048                dispatch_op_flags_t flags = DOP_DEFAULT;
1049                if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1050                        // Deliver even if there is less data than the low-water mark
1051                        flags |= DOP_DELIVER;
1052                }
1053                // If the operation is active, dont deliver data
1054                if ((op->active) && (flags & DOP_DELIVER)) {
1055                        op->flags = flags;
1056                } else {
1057                        _dispatch_operation_deliver_data(op, flags);
1058                }
1059        });
1060        op->timer = timer;
1061        return op->timer;
1062}
1063
1064#pragma mark -
1065#pragma mark dispatch_fd_entry_t
1066
1067static inline void
1068_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1069        dispatch_suspend(fd_entry->close_queue);
1070}
1071
1072static inline void
1073_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1074        dispatch_resume(fd_entry->close_queue);
1075}
1076
1077static void
1078_dispatch_fd_entry_init_async(dispatch_fd_t fd,
1079                dispatch_fd_entry_init_callback_t completion_callback)
1080{
1081        static dispatch_once_t _dispatch_io_fds_lockq_pred;
1082        dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1083                        _dispatch_io_fds_lockq_init);
1084        dispatch_async(_dispatch_io_fds_lockq, ^{
1085                _dispatch_io_debug("fd entry init", fd);
1086                dispatch_fd_entry_t fd_entry = NULL;
1087                // Check to see if there is an existing entry for the given fd
1088                uintptr_t hash = DIO_HASH(fd);
1089                TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1090                        if (fd_entry->fd == fd) {
1091                                // Retain the fd_entry to ensure it cannot go away until the
1092                                // stat() has completed
1093                                _dispatch_fd_entry_retain(fd_entry);
1094                                break;
1095                        }
1096                }
1097                if (!fd_entry) {
1098                        // If we did not find an existing entry, create one
1099                        fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1100                }
1101                dispatch_async(fd_entry->barrier_queue, ^{
1102                        _dispatch_io_debug("fd entry init completion", fd);
1103                        completion_callback(fd_entry);
1104                        // stat() is complete, release reference to fd_entry
1105                        _dispatch_fd_entry_release(fd_entry);
1106                });
1107        });
1108}
1109
1110static dispatch_fd_entry_t
1111_dispatch_fd_entry_create(dispatch_queue_t q)
1112{
1113        dispatch_fd_entry_t fd_entry;
1114        fd_entry = calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1115        fd_entry->close_queue = dispatch_queue_create(
1116                        "com.apple.libdispatch-io.closeq", NULL);
1117        // Use target queue to ensure that no concurrent lookups are going on when
1118        // the close queue is running
1119        fd_entry->close_queue->do_targetq = q;
1120        _dispatch_retain(q);
1121        // Suspend the cleanup queue until closing
1122        _dispatch_fd_entry_retain(fd_entry);
1123        return fd_entry;
1124}
1125
1126static dispatch_fd_entry_t
1127_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1128{
1129        // On fds lock queue
1130        _dispatch_io_debug("fd entry create", fd);
1131        dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1132                        _dispatch_io_fds_lockq);
1133        fd_entry->fd = fd;
1134        TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1135        fd_entry->barrier_queue = dispatch_queue_create(
1136                        "com.apple.libdispatch-io.barrierq", NULL);
1137        fd_entry->barrier_group = dispatch_group_create();
1138        dispatch_async(fd_entry->barrier_queue, ^{
1139                _dispatch_io_debug("fd entry stat", fd);
1140                int err, orig_flags, orig_nosigpipe = -1;
1141                struct stat st;
1142                _dispatch_io_syscall_switch(err,
1143                        fstat(fd, &st),
1144                        default: fd_entry->err = err; return;
1145                );
1146                fd_entry->stat.dev = st.st_dev;
1147                fd_entry->stat.mode = st.st_mode;
1148                _dispatch_io_syscall_switch(err,
1149                        orig_flags = fcntl(fd, F_GETFL),
1150                        default: (void)dispatch_assume_zero(err); break;
1151                );
1152#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1153                if (S_ISFIFO(st.st_mode)) {
1154                        _dispatch_io_syscall_switch(err,
1155                                orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1156                                default: (void)dispatch_assume_zero(err); break;
1157                        );
1158                        if (orig_nosigpipe != -1) {
1159                                _dispatch_io_syscall_switch(err,
1160                                        orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1161                                        default:
1162                                                orig_nosigpipe = -1;
1163                                                (void)dispatch_assume_zero(err);
1164                                                break;
1165                                );
1166                        }
1167                }
1168#endif
1169                if (S_ISREG(st.st_mode)) {
1170                        if (orig_flags != -1) {
1171                                _dispatch_io_syscall_switch(err,
1172                                        fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1173                                        default:
1174                                                orig_flags = -1;
1175                                                (void)dispatch_assume_zero(err);
1176                                                break;
1177                                );
1178                        }
1179                        int32_t dev = major(st.st_dev);
1180                        // We have to get the disk on the global dev queue. The
1181                        // barrier queue cannot continue until that is complete
1182                        dispatch_suspend(fd_entry->barrier_queue);
1183                        dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1184                                        _dispatch_io_devs_lockq_init);
1185                        dispatch_async(_dispatch_io_devs_lockq, ^{
1186                                _dispatch_disk_init(fd_entry, dev);
1187                                dispatch_resume(fd_entry->barrier_queue);
1188                        });
1189                } else {
1190                        if (orig_flags != -1) {
1191                                _dispatch_io_syscall_switch(err,
1192                                        fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1193                                        default:
1194                                                orig_flags = -1;
1195                                                (void)dispatch_assume_zero(err);
1196                                                break;
1197                                );
1198                        }
1199                        _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1200                                        DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1201                }
1202                fd_entry->orig_flags = orig_flags;
1203                fd_entry->orig_nosigpipe = orig_nosigpipe;
1204        });
1205        // This is the first item run when the close queue is resumed, indicating
1206        // that all channels associated with this entry have been closed and that
1207        // all operations associated with this entry have been freed
1208        dispatch_async(fd_entry->close_queue, ^{
1209                if (!fd_entry->disk) {
1210                        _dispatch_io_debug("close queue fd_entry cleanup", fd);
1211                        dispatch_op_direction_t dir;
1212                        for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1213                                _dispatch_stream_dispose(fd_entry, dir);
1214                        }
1215                } else {
1216                        dispatch_disk_t disk = fd_entry->disk;
1217                        dispatch_async(_dispatch_io_devs_lockq, ^{
1218                                _dispatch_release(disk);
1219                        });
1220                }
1221                // Remove this entry from the global fd list
1222                TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1223        });
1224        // If there was a source associated with this stream, disposing of the
1225        // source cancels it and suspends the close queue. Freeing the fd_entry
1226        // structure must happen after the source cancel handler has finished
1227        dispatch_async(fd_entry->close_queue, ^{
1228                _dispatch_io_debug("close queue release", fd);
1229                dispatch_release(fd_entry->close_queue);
1230                _dispatch_io_debug("barrier queue release", fd);
1231                dispatch_release(fd_entry->barrier_queue);
1232                _dispatch_io_debug("barrier group release", fd);
1233                dispatch_release(fd_entry->barrier_group);
1234                if (fd_entry->orig_flags != -1) {
1235                        _dispatch_io_syscall(
1236                                fcntl(fd, F_SETFL, fd_entry->orig_flags)
1237                        );
1238                }
1239#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1240                if (fd_entry->orig_nosigpipe != -1) {
1241                        _dispatch_io_syscall(
1242                                fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1243                        );
1244                }
1245#endif
1246                if (fd_entry->convenience_channel) {
1247                        fd_entry->convenience_channel->fd_entry = NULL;
1248                        dispatch_release(fd_entry->convenience_channel);
1249                }
1250                free(fd_entry);
1251        });
1252        return fd_entry;
1253}
1254
1255static dispatch_fd_entry_t
1256_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1257                dev_t dev, mode_t mode)
1258{
1259        // On devs lock queue
1260        _dispatch_io_debug("fd entry create with path %s", -1, path_data->path);
1261        dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1262                        path_data->channel->queue);
1263        if (S_ISREG(mode)) {
1264                _dispatch_disk_init(fd_entry, major(dev));
1265        } else {
1266                _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1267                                DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1268        }
1269        fd_entry->fd = -1;
1270        fd_entry->orig_flags = -1;
1271        fd_entry->path_data = path_data;
1272        fd_entry->stat.dev = dev;
1273        fd_entry->stat.mode = mode;
1274        fd_entry->barrier_queue = dispatch_queue_create(
1275                        "com.apple.libdispatch-io.barrierq", NULL);
1276        fd_entry->barrier_group = dispatch_group_create();
1277        // This is the first item run when the close queue is resumed, indicating
1278        // that the channel associated with this entry has been closed and that
1279        // all operations associated with this entry have been freed
1280        dispatch_async(fd_entry->close_queue, ^{
1281                _dispatch_io_debug("close queue fd_entry cleanup", -1);
1282                if (!fd_entry->disk) {
1283                        dispatch_op_direction_t dir;
1284                        for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1285                                _dispatch_stream_dispose(fd_entry, dir);
1286                        }
1287                }
1288                if (fd_entry->fd != -1) {
1289                        close(fd_entry->fd);
1290                }
1291                if (fd_entry->path_data->channel) {
1292                        // If associated channel has not been released yet, mark it as
1293                        // no longer having an fd_entry (for stop after close).
1294                        // It is safe to modify channel since we are on close_queue with
1295                        // target queue the channel queue
1296                        fd_entry->path_data->channel->fd_entry = NULL;
1297                }
1298        });
1299        dispatch_async(fd_entry->close_queue, ^{
1300                _dispatch_io_debug("close queue release", -1);
1301                dispatch_release(fd_entry->close_queue);
1302                dispatch_release(fd_entry->barrier_queue);
1303                dispatch_release(fd_entry->barrier_group);
1304                free(fd_entry->path_data);
1305                free(fd_entry);
1306        });
1307        return fd_entry;
1308}
1309
1310static int
1311_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1312{
1313        if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1314                return 0;
1315        }
1316        if (fd_entry->err) {
1317                return fd_entry->err;
1318        }
1319        int fd = -1;
1320        int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1321                        fd_entry->path_data->oflag | O_NONBLOCK;
1322open:
1323        fd = open(fd_entry->path_data->path, oflag, fd_entry->path_data->mode);
1324        if (fd == -1) {
1325                int err = errno;
1326                if (err == EINTR) {
1327                        goto open;
1328                }
1329                (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err);
1330                return err;
1331        }
1332        if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd)) {
1333                // Lost the race with another open
1334                close(fd);
1335        } else {
1336                channel->fd_actual = fd;
1337        }
1338        return 0;
1339}
1340
1341static void
1342_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1343                dispatch_io_t channel)
1344{
1345        if (fd_entry->disk) {
1346                if (channel) {
1347                        _dispatch_retain(channel);
1348                }
1349                _dispatch_fd_entry_retain(fd_entry);
1350                dispatch_async(fd_entry->disk->pick_queue, ^{
1351                        _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1352                        _dispatch_fd_entry_release(fd_entry);
1353                        if (channel) {
1354                                _dispatch_release(channel);
1355                        }
1356                });
1357        } else {
1358                dispatch_op_direction_t direction;
1359                for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1360                        dispatch_stream_t stream = fd_entry->streams[direction];
1361                        if (!stream) {
1362                                continue;
1363                        }
1364                        if (channel) {
1365                                _dispatch_retain(channel);
1366                        }
1367                        _dispatch_fd_entry_retain(fd_entry);
1368                        dispatch_async(stream->dq, ^{
1369                                _dispatch_stream_cleanup_operations(stream, channel);
1370                                _dispatch_fd_entry_release(fd_entry);
1371                                if (channel) {
1372                                        _dispatch_release(channel);
1373                                }
1374                        });
1375                }
1376        }
1377}
1378
1379#pragma mark -
1380#pragma mark dispatch_stream_t/dispatch_disk_t
1381
1382static void
1383_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1384{
1385        dispatch_op_direction_t direction;
1386        for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1387                dispatch_stream_t stream;
1388                stream = calloc(1ul, sizeof(struct dispatch_stream_s));
1389                stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1390                                NULL);
1391                _dispatch_retain(tq);
1392                stream->dq->do_targetq = tq;
1393                TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1394                TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1395                fd_entry->streams[direction] = stream;
1396        }
1397}
1398
1399static void
1400_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1401                dispatch_op_direction_t direction)
1402{
1403        // On close queue
1404        dispatch_stream_t stream = fd_entry->streams[direction];
1405        if (!stream) {
1406                return;
1407        }
1408        dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1409        dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1410        if (stream->source) {
1411                // Balanced by source cancel handler:
1412                _dispatch_fd_entry_retain(fd_entry);
1413                dispatch_source_cancel(stream->source);
1414                dispatch_resume(stream->source);
1415                dispatch_release(stream->source);
1416        }
1417        dispatch_release(stream->dq);
1418        free(stream);
1419}
1420
1421static void
1422_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1423{
1424        // On devs lock queue
1425        dispatch_disk_t disk;
1426        char label_name[256];
1427        // Check to see if there is an existing entry for the given device
1428        uintptr_t hash = DIO_HASH(dev);
1429        TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1430                if (disk->dev == dev) {
1431                        _dispatch_retain(disk);
1432                        goto out;
1433                }
1434        }
1435        // Otherwise create a new entry
1436        size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1437        disk = calloc(1ul, sizeof(struct dispatch_disk_s) + (pending_reqs_depth *
1438                        sizeof(dispatch_operation_t)));
1439        disk->do_vtable = &_dispatch_disk_vtable;
1440        disk->do_next = DISPATCH_OBJECT_LISTLESS;
1441        disk->do_ref_cnt = 1;
1442        disk->do_xref_cnt = 0;
1443        disk->advise_list_depth = pending_reqs_depth;
1444        disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
1445                        false);
1446        disk->dev = dev;
1447        TAILQ_INIT(&disk->operations);
1448        disk->cur_rq = TAILQ_FIRST(&disk->operations);
1449        sprintf(label_name, "com.apple.libdispatch-io.deviceq.%d", dev);
1450        disk->pick_queue = dispatch_queue_create(label_name, NULL);
1451        TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1452out:
1453        fd_entry->disk = disk;
1454        TAILQ_INIT(&fd_entry->stream_ops);
1455}
1456
1457static void
1458_dispatch_disk_dispose(dispatch_disk_t disk)
1459{
1460        uintptr_t hash = DIO_HASH(disk->dev);
1461        TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1462        dispatch_assert(TAILQ_EMPTY(&disk->operations));
1463        size_t i;
1464        for (i=0; i<disk->advise_list_depth; ++i) {
1465                dispatch_assert(!disk->advise_list[i]);
1466        }
1467        dispatch_release(disk->pick_queue);
1468        free(disk);
1469}
1470
1471#pragma mark -
1472#pragma mark dispatch_stream_operations/dispatch_disk_operations
1473
1474static inline bool
1475_dispatch_stream_operation_avail(dispatch_stream_t stream)
1476{
1477        return  !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1478                        !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1479}
1480
1481static void
1482_dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1483                dispatch_operation_t op, dispatch_data_t data)
1484{
1485        if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1486                return;
1487        }
1488        bool no_ops = !_dispatch_stream_operation_avail(stream);
1489        TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1490        if (no_ops) {
1491                dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
1492        }
1493}
1494
1495static void
1496_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1497                dispatch_data_t data)
1498{
1499        if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1500                return;
1501        }
1502        if (op->params.type == DISPATCH_IO_STREAM) {
1503                if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1504                        TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1505                }
1506                TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1507        } else {
1508                TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1509        }
1510        _dispatch_disk_handler(disk);
1511}
1512
1513static void
1514_dispatch_stream_complete_operation(dispatch_stream_t stream,
1515                dispatch_operation_t op)
1516{
1517        // On stream queue
1518        _dispatch_io_debug("complete operation", op->fd_entry->fd);
1519        TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1520        if (op == stream->op) {
1521                stream->op = NULL;
1522        }
1523        if (op->timer) {
1524                dispatch_source_cancel(op->timer);
1525        }
1526        // Final release will deliver any pending data
1527        _dispatch_release(op);
1528}
1529
1530static void
1531_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1532{
1533        // On pick queue
1534        _dispatch_io_debug("complete operation", op->fd_entry->fd);
1535        // Current request is always the last op returned
1536        if (disk->cur_rq == op) {
1537                disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1538                                operation_list);
1539        }
1540        if (op->params.type == DISPATCH_IO_STREAM) {
1541                // Check if there are other pending stream operations behind it
1542                dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1543                TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1544                if (op_next) {
1545                        TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1546                }
1547        }
1548        TAILQ_REMOVE(&disk->operations, op, operation_list);
1549        if (op->timer) {
1550                dispatch_source_cancel(op->timer);
1551        }
1552        // Final release will deliver any pending data
1553        _dispatch_release(op);
1554}
1555
1556static dispatch_operation_t
1557_dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1558                dispatch_operation_t op)
1559{
1560        // On stream queue
1561        if (!op) {
1562                // On the first run through, pick the first operation
1563                if (!_dispatch_stream_operation_avail(stream)) {
1564                        return op;
1565                }
1566                if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1567                        op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1568                } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1569                        op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1570                }
1571                return op;
1572        }
1573        if (op->params.type == DISPATCH_IO_STREAM) {
1574                // Stream operations need to be serialized so continue the current
1575                // operation until it is finished
1576                return op;
1577        }
1578        // Get the next random operation (round-robin)
1579        if (op->params.type == DISPATCH_IO_RANDOM) {
1580                op = TAILQ_NEXT(op, operation_list);
1581                if (!op) {
1582                        op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1583                }
1584                return op;
1585        }
1586        return NULL;
1587}
1588
1589static dispatch_operation_t
1590_dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1591{
1592        // On pick queue
1593        dispatch_operation_t op;
1594        if (!TAILQ_EMPTY(&disk->operations)) {
1595                if (disk->cur_rq == NULL) {
1596                        op = TAILQ_FIRST(&disk->operations);
1597                } else {
1598                        op = disk->cur_rq;
1599                        do {
1600                                op = TAILQ_NEXT(op, operation_list);
1601                                if (!op) {
1602                                        op = TAILQ_FIRST(&disk->operations);
1603                                }
1604                                // TODO: more involved picking algorithm rdar://problem/8780312
1605                        } while (op->active && op != disk->cur_rq);
1606                }
1607                if (!op->active) {
1608                        disk->cur_rq = op;
1609                        return op;
1610                }
1611        }
1612        return NULL;
1613}
1614
1615static void
1616_dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1617                dispatch_io_t channel)
1618{
1619        // On stream queue
1620        dispatch_operation_t op, tmp;
1621        typeof(*stream->operations) *operations;
1622        operations = &stream->operations[DISPATCH_IO_RANDOM];
1623        TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1624                if (!channel || op->channel == channel) {
1625                        _dispatch_stream_complete_operation(stream, op);
1626                }
1627        }
1628        operations = &stream->operations[DISPATCH_IO_STREAM];
1629        TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1630                if (!channel || op->channel == channel) {
1631                        _dispatch_stream_complete_operation(stream, op);
1632                }
1633        }
1634        if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1635                dispatch_suspend(stream->source);
1636                stream->source_running = false;
1637        }
1638}
1639
1640static void
1641_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1642{
1643        // On pick queue
1644        dispatch_operation_t op, tmp;
1645        TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1646                if (!channel || op->channel == channel) {
1647                        _dispatch_disk_complete_operation(disk, op);
1648                }
1649        }
1650}
1651
1652#pragma mark -
1653#pragma mark dispatch_stream_handler/dispatch_disk_handler
1654
1655static dispatch_source_t
1656_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1657{
1658        // On stream queue
1659        if (stream->source) {
1660                return stream->source;
1661        }
1662        dispatch_fd_t fd = op->fd_entry->fd;
1663        _dispatch_io_debug("stream source create", fd);
1664        dispatch_source_t source = NULL;
1665        if (op->direction == DOP_DIR_READ) {
1666                source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
1667                                stream->dq);
1668        } else if (op->direction == DOP_DIR_WRITE) {
1669                source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0,
1670                                stream->dq);
1671        } else {
1672                dispatch_assert(op->direction < DOP_DIR_MAX);
1673                return NULL;
1674        }
1675        dispatch_set_context(source, stream);
1676        dispatch_source_set_event_handler_f(source,
1677                        _dispatch_stream_source_handler);
1678        // Close queue must not run user cleanup handlers until sources are fully
1679        // unregistered
1680        dispatch_queue_t close_queue = op->fd_entry->close_queue;
1681        dispatch_source_set_cancel_handler(source, ^{
1682                _dispatch_io_debug("stream source cancel", fd);
1683                dispatch_resume(close_queue);
1684        });
1685        stream->source = source;
1686        return stream->source;
1687}
1688
1689static void
1690_dispatch_stream_source_handler(void *ctx)
1691{
1692        // On stream queue
1693        dispatch_stream_t stream = (dispatch_stream_t)ctx;
1694        dispatch_suspend(stream->source);
1695        stream->source_running = false;
1696        return _dispatch_stream_handler(stream);
1697}
1698
1699static void
1700_dispatch_stream_handler(void *ctx)
1701{
1702        // On stream queue
1703        dispatch_stream_t stream = (dispatch_stream_t)ctx;
1704        dispatch_operation_t op;
1705pick:
1706        op = _dispatch_stream_pick_next_operation(stream, stream->op);
1707        if (!op) {
1708                _dispatch_debug("no operation found: stream %p", stream);
1709                return;
1710        }
1711        int err = _dispatch_io_get_error(op, NULL, true);
1712        if (err) {
1713                op->err = err;
1714                _dispatch_stream_complete_operation(stream, op);
1715                goto pick;
1716        }
1717        stream->op = op;
1718        _dispatch_io_debug("stream handler", op->fd_entry->fd);
1719        dispatch_fd_entry_t fd_entry = op->fd_entry;
1720        _dispatch_fd_entry_retain(fd_entry);
1721        // For performance analysis
1722        if (!op->total && dispatch_io_defaults.initial_delivery) {
1723                // Empty delivery to signal the start of the operation
1724                _dispatch_io_debug("initial delivery", op->fd_entry->fd);
1725                _dispatch_operation_deliver_data(op, DOP_DELIVER);
1726        }
1727        // TODO: perform on the operation target queue to get correct priority
1728        int result = _dispatch_operation_perform(op), flags = -1;
1729        switch (result) {
1730        case DISPATCH_OP_DELIVER:
1731                flags = DOP_DEFAULT;
1732                // Fall through
1733        case DISPATCH_OP_DELIVER_AND_COMPLETE:
1734                flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1735                                DOP_DEFAULT;
1736                _dispatch_operation_deliver_data(op, flags);
1737                // Fall through
1738        case DISPATCH_OP_COMPLETE:
1739                if (flags != DOP_DEFAULT) {
1740                        _dispatch_stream_complete_operation(stream, op);
1741                }
1742                if (_dispatch_stream_operation_avail(stream)) {
1743                        dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
1744                }
1745                break;
1746        case DISPATCH_OP_COMPLETE_RESUME:
1747                _dispatch_stream_complete_operation(stream, op);
1748                // Fall through
1749        case DISPATCH_OP_RESUME:
1750                if (_dispatch_stream_operation_avail(stream)) {
1751                        stream->source_running = true;
1752                        dispatch_resume(_dispatch_stream_source(stream, op));
1753                }
1754                break;
1755        case DISPATCH_OP_ERR:
1756                _dispatch_stream_cleanup_operations(stream, op->channel);
1757                break;
1758        case DISPATCH_OP_FD_ERR:
1759                _dispatch_fd_entry_retain(fd_entry);
1760                dispatch_async(fd_entry->barrier_queue, ^{
1761                        _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1762                        _dispatch_fd_entry_release(fd_entry);
1763                });
1764                break;
1765        default:
1766                break;
1767        }
1768        _dispatch_fd_entry_release(fd_entry);
1769        return;
1770}
1771
1772static void
1773_dispatch_disk_handler(void *ctx)
1774{
1775        // On pick queue
1776        dispatch_disk_t disk = (dispatch_disk_t)ctx;
1777        if (disk->io_active) {
1778                return;
1779        }
1780        _dispatch_io_debug("disk handler", -1);
1781        dispatch_operation_t op;
1782        size_t i = disk->free_idx, j = disk->req_idx;
1783        if (j <= i) {
1784                j += disk->advise_list_depth;
1785        }
1786        while (i <= j) {
1787                if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1788                                (op = _dispatch_disk_pick_next_operation(disk))) {
1789                        int err = _dispatch_io_get_error(op, NULL, true);
1790                        if (err) {
1791                                op->err = err;
1792                                _dispatch_disk_complete_operation(disk, op);
1793                                continue;
1794                        }
1795                        _dispatch_retain(op);
1796                        disk->advise_list[i%disk->advise_list_depth] = op;
1797                        op->active = true;
1798                } else {
1799                        // No more operations to get
1800                        break;
1801                }
1802                i++;
1803        }
1804        disk->free_idx = (i%disk->advise_list_depth);
1805        op = disk->advise_list[disk->req_idx];
1806        if (op) {
1807                disk->io_active = true;
1808                dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1809        }
1810}
1811
1812static void
1813_dispatch_disk_perform(void *ctxt)
1814{
1815        dispatch_disk_t disk = ctxt;
1816        size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1817        _dispatch_io_debug("disk perform", -1);
1818        dispatch_operation_t op;
1819        size_t i = disk->advise_idx, j = disk->free_idx;
1820        if (j <= i) {
1821                j += disk->advise_list_depth;
1822        }
1823        do {
1824                op = disk->advise_list[i%disk->advise_list_depth];
1825                if (!op) {
1826                        // Nothing more to advise, must be at free_idx
1827                        dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
1828                        break;
1829                }
1830                if (op->direction == DOP_DIR_WRITE) {
1831                        // TODO: preallocate writes ? rdar://problem/9032172
1832                        continue;
1833                }
1834                if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
1835                                op->channel)) {
1836                        continue;
1837                }
1838                // For performance analysis
1839                if (!op->total && dispatch_io_defaults.initial_delivery) {
1840                        // Empty delivery to signal the start of the operation
1841                        _dispatch_io_debug("initial delivery", op->fd_entry->fd);
1842                        _dispatch_operation_deliver_data(op, DOP_DELIVER);
1843                }
1844                // Advise two chunks if the list only has one element and this is the
1845                // first advise on the operation
1846                if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
1847                                !op->advise_offset) {
1848                        chunk_size *= 2;
1849                }
1850                _dispatch_operation_advise(op, chunk_size);
1851        } while (++i < j);
1852        disk->advise_idx = i%disk->advise_list_depth;
1853        op = disk->advise_list[disk->req_idx];
1854        int result = _dispatch_operation_perform(op);
1855        disk->advise_list[disk->req_idx] = NULL;
1856        disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
1857        dispatch_async(disk->pick_queue, ^{
1858                switch (result) {
1859                case DISPATCH_OP_DELIVER:
1860                        _dispatch_operation_deliver_data(op, DOP_DELIVER);
1861                        break;
1862                case DISPATCH_OP_COMPLETE:
1863                        _dispatch_disk_complete_operation(disk, op);
1864                        break;
1865                case DISPATCH_OP_DELIVER_AND_COMPLETE:
1866                        _dispatch_operation_deliver_data(op, DOP_DELIVER);
1867                        _dispatch_disk_complete_operation(disk, op);
1868                        break;
1869                case DISPATCH_OP_ERR:
1870                        _dispatch_disk_cleanup_operations(disk, op->channel);
1871                        break;
1872                case DISPATCH_OP_FD_ERR:
1873                        _dispatch_disk_cleanup_operations(disk, NULL);
1874                        break;
1875                default:
1876                        dispatch_assert(result);
1877                        break;
1878                }
1879                op->active = false;
1880                disk->io_active = false;
1881                _dispatch_disk_handler(disk);
1882                // Balancing the retain in _dispatch_disk_handler. Note that op must be
1883                // released at the very end, since it might hold the last reference to
1884                // the disk
1885                _dispatch_release(op);
1886        });
1887}
1888
1889#pragma mark -
1890#pragma mark dispatch_operation_perform
1891
1892static void
1893_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
1894{
1895        int err;
1896        struct radvisory advise;
1897        // No point in issuing a read advise for the next chunk if we are already
1898        // a chunk ahead from reading the bytes
1899        if (op->advise_offset > (off_t)((op->offset+op->total) + chunk_size +
1900                        PAGE_SIZE)) {
1901                return;
1902        }
1903        advise.ra_count = (int)chunk_size;
1904        if (!op->advise_offset) {
1905                op->advise_offset = op->offset;
1906                // If this is the first time through, align the advised range to a
1907                // page boundary
1908                size_t pg_fraction = (size_t)((op->offset + chunk_size) % PAGE_SIZE);
1909                advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
1910        }
1911        advise.ra_offset = op->advise_offset;
1912        op->advise_offset += advise.ra_count;
1913        _dispatch_io_syscall_switch(err,
1914                fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
1915                // TODO: set disk status on error
1916                default: (void)dispatch_assume_zero(err); break;
1917        );
1918}
1919
1920static int
1921_dispatch_operation_perform(dispatch_operation_t op)
1922{
1923        int err = _dispatch_io_get_error(op, NULL, true);
1924        if (err) {
1925                goto error;
1926        }
1927        if (!op->buf) {
1928                size_t max_buf_siz = op->params.high;
1929                size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1930                if (op->direction == DOP_DIR_READ) {
1931                        // If necessary, create a buffer for the ongoing operation, large
1932                        // enough to fit chunk_pages but at most high-water
1933                        size_t data_siz = dispatch_data_get_size(op->data);
1934                        if (data_siz) {
1935                                dispatch_assert(data_siz < max_buf_siz);
1936                                max_buf_siz -= data_siz;
1937                        }
1938                        if (max_buf_siz > chunk_siz) {
1939                                max_buf_siz = chunk_siz;
1940                        }
1941                        if (op->length < SIZE_MAX) {
1942                                op->buf_siz = op->length - op->total;
1943                                if (op->buf_siz > max_buf_siz) {
1944                                        op->buf_siz = max_buf_siz;
1945                                }
1946                        } else {
1947                                op->buf_siz = max_buf_siz;
1948                        }
1949                        op->buf = valloc(op->buf_siz);
1950                        _dispatch_io_debug("buffer allocated", op->fd_entry->fd);
1951                } else if (op->direction == DOP_DIR_WRITE) {
1952                        // Always write the first data piece, if that is smaller than a
1953                        // chunk, accumulate further data pieces until chunk size is reached
1954                        if (chunk_siz > max_buf_siz) {
1955                                chunk_siz = max_buf_siz;
1956                        }
1957                        op->buf_siz = 0;
1958                        dispatch_data_apply(op->data,
1959                                        ^(dispatch_data_t region DISPATCH_UNUSED,
1960                                        size_t offset DISPATCH_UNUSED,
1961                                        const void* buf DISPATCH_UNUSED, size_t len) {
1962                                size_t siz = op->buf_siz + len;
1963                                if (!op->buf_siz || siz <= chunk_siz) {
1964                                        op->buf_siz = siz;
1965                                }
1966                                return (bool)(siz < chunk_siz);
1967                        });
1968                        if (op->buf_siz > max_buf_siz) {
1969                                op->buf_siz = max_buf_siz;
1970                        }
1971                        dispatch_data_t d;
1972                        d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
1973                        op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
1974                                        NULL);
1975                        _dispatch_io_data_release(d);
1976                        _dispatch_io_debug("buffer mapped", op->fd_entry->fd);
1977                }
1978        }
1979        if (op->fd_entry->fd == -1) {
1980                err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
1981                if (err) {
1982                        goto error;
1983                }
1984        }
1985        void *buf = op->buf + op->buf_len;
1986        size_t len = op->buf_siz - op->buf_len;
1987        off_t off = op->offset + op->total;
1988        ssize_t processed = -1;
1989syscall:
1990        if (op->direction == DOP_DIR_READ) {
1991                if (op->params.type == DISPATCH_IO_STREAM) {
1992                        processed = read(op->fd_entry->fd, buf, len);
1993                } else if (op->params.type == DISPATCH_IO_RANDOM) {
1994                        processed = pread(op->fd_entry->fd, buf, len, off);
1995                }
1996        } else if (op->direction == DOP_DIR_WRITE) {
1997                if (op->params.type == DISPATCH_IO_STREAM) {
1998                        processed = write(op->fd_entry->fd, buf, len);
1999                } else if (op->params.type == DISPATCH_IO_RANDOM) {
2000                        processed = pwrite(op->fd_entry->fd, buf, len, off);
2001                }
2002        }
2003        // Encountered an error on the file descriptor
2004        if (processed == -1) {
2005                err = errno;
2006                if (err == EINTR) {
2007                        goto syscall;
2008                }
2009                goto error;
2010        }
2011        // EOF is indicated by two handler invocations
2012        if (processed == 0) {
2013                _dispatch_io_debug("EOF", op->fd_entry->fd);
2014                return DISPATCH_OP_DELIVER_AND_COMPLETE;
2015        }
2016        op->buf_len += processed;
2017        op->total += processed;
2018        if (op->total == op->length) {
2019                // Finished processing all the bytes requested by the operation
2020                return DISPATCH_OP_COMPLETE;
2021        } else {
2022                // Deliver data only if we satisfy the filters
2023                return DISPATCH_OP_DELIVER;
2024        }
2025error:
2026        if (err == EAGAIN) {
2027                // For disk based files with blocking I/O we should never get EAGAIN
2028                dispatch_assert(!op->fd_entry->disk);
2029                _dispatch_io_debug("EAGAIN %d", op->fd_entry->fd, err);
2030                if (op->direction == DOP_DIR_READ && op->total &&
2031                                op->channel == op->fd_entry->convenience_channel) {
2032                        // Convenience read with available data completes on EAGAIN
2033                        return DISPATCH_OP_COMPLETE_RESUME;
2034                }
2035                return DISPATCH_OP_RESUME;
2036        }
2037        op->err = err;
2038        switch (err) {
2039        case ECANCELED:
2040                return DISPATCH_OP_ERR;
2041        case EBADF:
2042                (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err);
2043                return DISPATCH_OP_FD_ERR;
2044        default:
2045                return DISPATCH_OP_COMPLETE;
2046        }
2047}
2048
2049static void
2050_dispatch_operation_deliver_data(dispatch_operation_t op,
2051                dispatch_op_flags_t flags)
2052{
2053        // Either called from stream resp. pick queue or when op is finalized
2054        dispatch_data_t data = NULL;
2055        int err = 0;
2056        size_t undelivered = op->undelivered + op->buf_len;
2057        bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2058                        (op->flags & DOP_DELIVER);
2059        op->flags = DOP_DEFAULT;
2060        if (!deliver) {
2061                // Don't deliver data until low water mark has been reached
2062                if (undelivered >= op->params.low) {
2063                        deliver = true;
2064                } else if (op->buf_len < op->buf_siz) {
2065                        // Request buffer is not yet used up
2066                        _dispatch_io_debug("buffer data", op->fd_entry->fd);
2067                        return;
2068                }
2069        } else {
2070                err = op->err;
2071                if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2072                        err = ECANCELED;
2073                        op->err = err;
2074                }
2075        }
2076        // Deliver data or buffer used up
2077        if (op->direction == DOP_DIR_READ) {
2078                if (op->buf_len) {
2079                        void *buf = op->buf;
2080                        data = dispatch_data_create(buf, op->buf_len, NULL,
2081                                        DISPATCH_DATA_DESTRUCTOR_FREE);
2082                        op->buf = NULL;
2083                        op->buf_len = 0;
2084                        dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2085                        _dispatch_io_data_release(op->data);
2086                        _dispatch_io_data_release(data);
2087                        data = d;
2088                } else {
2089                        data = op->data;
2090                }
2091                op->data = deliver ? dispatch_data_empty : data;
2092        } else if (op->direction == DOP_DIR_WRITE) {
2093                if (deliver) {
2094                        data = dispatch_data_create_subrange(op->data, op->buf_len,
2095                                        op->length);
2096                }
2097                if (op->buf_len == op->buf_siz) {
2098                        _dispatch_io_data_release(op->buf_data);
2099                        op->buf_data = NULL;
2100                        op->buf = NULL;
2101                        op->buf_len = 0;
2102                        // Trim newly written buffer from head of unwritten data
2103                        dispatch_data_t d;
2104                        if (deliver) {
2105                                _dispatch_io_data_retain(data);
2106                                d = data;
2107                        } else {
2108                                d = dispatch_data_create_subrange(op->data, op->buf_len,
2109                                                op->length);
2110                        }
2111                        _dispatch_io_data_release(op->data);
2112                        op->data = d;
2113                }
2114        } else {
2115                dispatch_assert(op->direction < DOP_DIR_MAX);
2116                return;
2117        }
2118        if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2119                op->undelivered = undelivered;
2120                _dispatch_io_debug("buffer data", op->fd_entry->fd);
2121                return;
2122        }
2123        op->undelivered = 0;
2124        _dispatch_io_debug("deliver data", op->fd_entry->fd);
2125        dispatch_op_direction_t direction = op->direction;
2126        __block dispatch_data_t d = data;
2127        dispatch_io_handler_t handler = op->handler;
2128#if DISPATCH_IO_DEBUG
2129        int fd = op->fd_entry->fd;
2130#endif
2131        dispatch_fd_entry_t fd_entry = op->fd_entry;
2132        _dispatch_fd_entry_retain(fd_entry);
2133        dispatch_io_t channel = op->channel;
2134        _dispatch_retain(channel);
2135        // Note that data delivery may occur after the operation is freed
2136        dispatch_async(op->op_q, ^{
2137                bool done = (flags & DOP_DONE);
2138                if (done) {
2139                        if (direction == DOP_DIR_READ && err) {
2140                                if (dispatch_data_get_size(d)) {
2141                                        _dispatch_io_debug("IO handler invoke", fd);
2142                                        handler(false, d, 0);
2143                                }
2144                                d = NULL;
2145                        } else if (direction == DOP_DIR_WRITE && !err) {
2146                                d = NULL;
2147                        }
2148                }
2149                _dispatch_io_debug("IO handler invoke", fd);
2150                handler(done, d, err);
2151                _dispatch_release(channel);
2152                _dispatch_fd_entry_release(fd_entry);
2153                _dispatch_io_data_release(data);
2154        });
2155}
Note: See TracBrowser for help on using the repository browser.