From 7ca0d146ecaf2f99781653d1203bd3db7afc85ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Ter=C3=A4s?= Date: Fri, 10 Jan 2020 11:02:48 +0200 Subject: [PATCH] istream: add buffering capability Convert all implementations to do buffering. This is in preparation to remove bstream interface as redundant. istream_read() will return full reads unless end-of-file. The backends can return short reads to optimize buffering or due to other reasons like boundary change for gz. --- src/apk_io.h | 75 +++++++++++++++++++++++++---------------------- src/archive.c | 4 ++- src/gunzip.c | 34 ++++++++-------------- src/io.c | 80 ++++++++++++++++++++++++++++++--------------------- src/url.c | 19 +++++------- 5 files changed, 109 insertions(+), 103 deletions(-) diff --git a/src/apk_io.h b/src/apk_io.h index 2bb0f26..ea2eb42 100644 --- a/src/apk_io.h +++ b/src/apk_io.h @@ -53,6 +53,8 @@ struct apk_file_info { struct apk_xattr_array *xattrs; }; +extern size_t apk_io_bufsize; + struct apk_istream; struct apk_bstream; struct apk_ostream; @@ -63,10 +65,48 @@ struct apk_istream_ops { void (*close)(struct apk_istream *is); }; +#define APK_ISTREAM_SINGLE_READ 0x0001 + struct apk_istream { + uint8_t *ptr, *end, *buf; + size_t buf_size; + int err; + unsigned int flags; const struct apk_istream_ops *ops; }; +struct apk_istream *apk_istream_from_file(int atfd, const char *file); +struct apk_istream *apk_istream_from_file_gz(int atfd, const char *file); +struct apk_istream *apk_istream_from_fd(int fd); +struct apk_istream *apk_istream_from_fd_url_if_modified(int atfd, const char *url, time_t since); +struct apk_istream *apk_istream_from_url_gz(const char *url); +ssize_t apk_istream_read(struct apk_istream *is, void *ptr, size_t size); + +#define APK_SPLICE_ALL 0xffffffff +ssize_t apk_istream_splice(struct apk_istream *is, int fd, size_t size, + apk_progress_cb cb, void *cb_ctx); + +static inline struct apk_istream *apk_istream_from_url(const char *url) +{ + return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, 0); +} +static inline struct apk_istream *apk_istream_from_fd_url(int atfd, const char *url) +{ + return apk_istream_from_fd_url_if_modified(atfd, url, 0); +} +static inline struct apk_istream *apk_istream_from_url_if_modified(const char *url, time_t since) +{ + return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, since); +} +static inline void apk_istream_get_meta(struct apk_istream *is, struct apk_file_meta *meta) +{ + is->ops->get_meta(is, meta); +} +static inline void apk_istream_close(struct apk_istream *is) +{ + is->ops->close(is); +} + #define APK_BSTREAM_SINGLE_READ 0x0001 #define APK_BSTREAM_EOF 0x0002 @@ -106,41 +146,6 @@ static inline struct apk_istream *apk_bstream_gunzip(struct apk_bstream *bs) struct apk_ostream *apk_ostream_gzip(struct apk_ostream *); struct apk_ostream *apk_ostream_counter(off_t *); -struct apk_istream *apk_istream_from_file(int atfd, const char *file); -struct apk_istream *apk_istream_from_file_gz(int atfd, const char *file); -struct apk_istream *apk_istream_from_fd(int fd); -struct apk_istream *apk_istream_from_fd_url_if_modified(int atfd, const char *url, time_t since); -struct apk_istream *apk_istream_from_url_gz(const char *url); -ssize_t apk_istream_skip(struct apk_istream *istream, size_t size); - -#define APK_SPLICE_ALL 0xffffffff -ssize_t apk_istream_splice(struct apk_istream *is, int fd, size_t size, - apk_progress_cb cb, void *cb_ctx); - -static inline struct apk_istream *apk_istream_from_url(const char *url) -{ - return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, 0); -} -static inline struct apk_istream *apk_istream_from_fd_url(int atfd, const char *url) -{ - return apk_istream_from_fd_url_if_modified(atfd, url, 0); -} -static inline struct apk_istream *apk_istream_from_url_if_modified(const char *url, time_t since) -{ - return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, since); -} -static inline void apk_istream_get_meta(struct apk_istream *is, struct apk_file_meta *meta) -{ - is->ops->get_meta(is, meta); -} -static inline ssize_t apk_istream_read(struct apk_istream *is, void *ptr, size_t size) -{ - return is->ops->read(is, ptr, size); -} -static inline void apk_istream_close(struct apk_istream *is) -{ - is->ops->close(is); -} struct apk_bstream *apk_bstream_from_istream(struct apk_istream *istream); struct apk_bstream *apk_bstream_from_file(int atfd, const char *file); diff --git a/src/archive.c b/src/archive.c index db9242f..1d956f3 100644 --- a/src/archive.c +++ b/src/archive.c @@ -303,8 +303,10 @@ int apk_tar_parse(struct apk_istream *is, apk_archive_entry_parser parser, toskip += 512 - ((offset + toskip) & 511); offset += toskip; if (toskip != 0) { - if ((r = apk_istream_read(is, NULL, toskip)) != toskip) + if ((r = apk_istream_read(is, NULL, toskip)) != toskip) { + r = -EIO; goto err; + } } } diff --git a/src/gunzip.c b/src/gunzip.c index 2c35c5b..17b74a3 100644 --- a/src/gunzip.c +++ b/src/gunzip.c @@ -22,7 +22,6 @@ struct apk_gzip_istream { struct apk_istream is; struct apk_bstream *bs; z_stream zs; - int err; apk_multipart_cb cb; void *cbctx; @@ -40,9 +39,9 @@ static int gzi_boundary_change(struct apk_gzip_istream *gis) { int r; - r = gis->cb(gis->cbctx, gis->err ? APK_MPART_END : APK_MPART_BOUNDARY, gis->cbarg); + r = gis->cb(gis->cbctx, gis->is.err ? APK_MPART_END : APK_MPART_BOUNDARY, gis->cbarg); if (r > 0) r = -ECANCELED; - if (r != 0) gis->err = r; + if (r != 0) gis->is.err = r; return r; } @@ -51,19 +50,10 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size) struct apk_gzip_istream *gis = container_of(is, struct apk_gzip_istream, is); int r; - if (gis->err != 0) { - if (gis->err > 0) - return 0; - return gis->err; - } - - if (ptr == NULL) - return apk_istream_skip(&gis->is, size); - gis->zs.avail_out = size; gis->zs.next_out = ptr; - while (gis->zs.avail_out != 0 && gis->err == 0) { + while (gis->zs.avail_out != 0 && gis->is.err == 0) { if (!APK_BLOB_IS_NULL(gis->cbarg)) { if (gzi_boundary_change(gis)) goto ret; @@ -83,10 +73,10 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size) gis->zs.avail_in = blob.len; gis->zs.next_in = (void *) gis->cbprev; if (blob.len < 0) { - gis->err = blob.len; + gis->is.err = blob.len; goto ret; } else if (gis->zs.avail_in == 0) { - gis->err = 1; + gis->is.err = 1; gis->cbarg = APK_BLOB_NULL; gzi_boundary_change(gis); goto ret; @@ -99,7 +89,7 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size) /* Digest the inflated bytes */ if ((gis->bs->flags & APK_BSTREAM_EOF) && gis->zs.avail_in == 0) - gis->err = 1; + gis->is.err = 1; if (gis->cb != NULL) { gis->cbarg = APK_BLOB_PTR_LEN(gis->cbprev, (void *) gis->zs.next_in - gis->cbprev); gis->cbprev = gis->zs.next_in; @@ -109,26 +99,24 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size) * callback here, as we won't be called again. * For boundaries it should be postponed to not * be called until next gzip read is started. */ - if (gis->err) { + if (gis->is.err) { gzi_boundary_change(gis); goto ret; } inflateEnd(&gis->zs); if (inflateInit2(&gis->zs, 15+32) != Z_OK) return -ENOMEM; + if (gis->cb) goto ret; break; case Z_OK: break; default: - gis->err = -EIO; + gis->is.err = -EIO; break; } } ret: - if (size - gis->zs.avail_out == 0) - return gis->err < 0 ? gis->err : 0; - return size - gis->zs.avail_out; } @@ -154,11 +142,13 @@ struct apk_istream *apk_bstream_gunzip_mpart(struct apk_bstream *bs, if (IS_ERR_OR_NULL(bs)) return ERR_CAST(bs); - gis = malloc(sizeof(struct apk_gzip_istream)); + gis = malloc(sizeof(*gis) + apk_io_bufsize); if (!gis) goto err; *gis = (struct apk_gzip_istream) { .is.ops = &gunzip_istream_ops, + .is.buf = (uint8_t*)(gis + 1), + .is.buf_size = apk_io_bufsize, .bs = bs, .cb = cb, .cbctx = ctx, diff --git a/src/io.c b/src/io.c index 8aac019..d7c3bb9 100644 --- a/src/io.c +++ b/src/io.c @@ -35,6 +35,8 @@ #define HAVE_FGETGRENT_R #endif +size_t apk_io_bufsize = 2*1024; + static void apk_file_meta_from_fd(int fd, struct apk_file_meta *meta) { struct stat st; @@ -56,6 +58,44 @@ void apk_file_meta_to_fd(int fd, struct apk_file_meta *meta) futimens(fd, times); } +ssize_t apk_istream_read(struct apk_istream *is, void *ptr, size_t size) +{ + ssize_t left = size, r = 0; + + while (left) { + if (is->ptr != is->end) { + r = MIN(left, is->end - is->ptr); + if (ptr) { + memcpy(ptr, is->ptr, r); + ptr += r; + } + is->ptr += r; + left -= r; + continue; + } + if (is->err) break; + + if (ptr && left > is->buf_size/4) { + r = is->ops->read(is, ptr, left); + if (r <= 0) break; + left -= r; + ptr += r; + continue; + } + + r = is->ops->read(is, is->buf, is->buf_size); + if (r <= 0) break; + + is->ptr = is->buf; + is->end = is->buf + r; + } + + if (r < 0) return r; + if (size && left == size && !is->err) is->err = 1; + if (size == left) return is->err < 0 ? is->err : 0; + return size - left; +} + struct apk_fd_istream { struct apk_istream is; int fd; @@ -70,24 +110,11 @@ static void fdi_get_meta(struct apk_istream *is, struct apk_file_meta *meta) static ssize_t fdi_read(struct apk_istream *is, void *ptr, size_t size) { struct apk_fd_istream *fis = container_of(is, struct apk_fd_istream, is); - ssize_t i = 0, r; + ssize_t r; - if (ptr == NULL) { - if (lseek(fis->fd, size, SEEK_CUR) < 0) - return -errno; - return size; - } - - while (i < size) { - r = read(fis->fd, ptr + i, size - i); - if (r < 0) - return -errno; - if (r == 0) - break; - i += r; - } - - return i; + r = read(fis->fd, ptr, size); + if (r < 0) return -errno; + return r; } static void fdi_close(struct apk_istream *is) @@ -110,7 +137,7 @@ struct apk_istream *apk_istream_from_fd(int fd) if (fd < 0) return ERR_PTR(-EBADF); - fis = malloc(sizeof(struct apk_fd_istream)); + fis = malloc(sizeof(*fis) + apk_io_bufsize); if (fis == NULL) { close(fd); return ERR_PTR(-ENOMEM); @@ -118,6 +145,8 @@ struct apk_istream *apk_istream_from_fd(int fd) *fis = (struct apk_fd_istream) { .is.ops = &fd_istream_ops, + .is.buf = (uint8_t *)(fis + 1), + .is.buf_size = apk_io_bufsize, .fd = fd, }; @@ -134,21 +163,6 @@ struct apk_istream *apk_istream_from_file(int atfd, const char *file) return apk_istream_from_fd(fd); } -ssize_t apk_istream_skip(struct apk_istream *is, size_t size) -{ - unsigned char buf[2048]; - size_t done = 0, togo; - ssize_t r; - - while (done < size) { - togo = MIN(size - done, sizeof buf); - r = apk_istream_read(is, buf, togo); - if (r <= 0) return r ?: -EIO; - done += r; - } - return done; -} - ssize_t apk_istream_splice(struct apk_istream *is, int fd, size_t size, apk_progress_cb cb, void *cb_ctx) { diff --git a/src/url.c b/src/url.c index cacca81..d233e06 100644 --- a/src/url.c +++ b/src/url.c @@ -80,18 +80,11 @@ static void fetch_get_meta(struct apk_istream *is, struct apk_file_meta *meta) static ssize_t fetch_read(struct apk_istream *is, void *ptr, size_t size) { struct apk_fetch_istream *fis = container_of(is, struct apk_fetch_istream, is); - ssize_t i = 0, r; + ssize_t r; - if (ptr == NULL) return apk_istream_skip(&fis->is, size); - - while (i < size) { - r = fetchIO_read(fis->fetchIO, ptr + i, size - i); - if (r < 0) return -EIO; - if (r == 0) break; - i += r; - } - - return i; + r = fetchIO_read(fis->fetchIO, ptr, size); + if (r < 0) return -EIO; + return r; } static void fetch_close(struct apk_istream *is) @@ -120,7 +113,7 @@ static struct apk_istream *apk_istream_fetch(const char *url, time_t since) rc = -EAPKBADURL; goto err; } - fis = malloc(sizeof(*fis)); + fis = malloc(sizeof *fis + apk_io_bufsize); if (!fis) { rc = -ENOMEM; goto err; @@ -135,6 +128,8 @@ static struct apk_istream *apk_istream_fetch(const char *url, time_t since) *fis = (struct apk_fetch_istream) { .is.ops = &fetch_istream_ops, + .is.buf = (uint8_t*)(fis+1), + .is.buf_size = apk_io_bufsize, .fetchIO = io, .urlstat = fis->urlstat, };