istream: add buffering capability

Convert all implementations to do buffering. This is in preparation
to remove bstream interface as redundant.

istream_read() will return full reads unless end-of-file. The backends
can return short reads to optimize buffering or due to other reasons
like boundary change for gz.
cute-signatures
Timo Teräs 2020-01-10 11:02:48 +02:00
parent 9dda2d3c21
commit 7ca0d146ec
5 changed files with 109 additions and 103 deletions

View File

@ -53,6 +53,8 @@ struct apk_file_info {
struct apk_xattr_array *xattrs; struct apk_xattr_array *xattrs;
}; };
extern size_t apk_io_bufsize;
struct apk_istream; struct apk_istream;
struct apk_bstream; struct apk_bstream;
struct apk_ostream; struct apk_ostream;
@ -63,10 +65,48 @@ struct apk_istream_ops {
void (*close)(struct apk_istream *is); void (*close)(struct apk_istream *is);
}; };
#define APK_ISTREAM_SINGLE_READ 0x0001
struct apk_istream { struct apk_istream {
uint8_t *ptr, *end, *buf;
size_t buf_size;
int err;
unsigned int flags;
const struct apk_istream_ops *ops; const struct apk_istream_ops *ops;
}; };
struct apk_istream *apk_istream_from_file(int atfd, const char *file);
struct apk_istream *apk_istream_from_file_gz(int atfd, const char *file);
struct apk_istream *apk_istream_from_fd(int fd);
struct apk_istream *apk_istream_from_fd_url_if_modified(int atfd, const char *url, time_t since);
struct apk_istream *apk_istream_from_url_gz(const char *url);
ssize_t apk_istream_read(struct apk_istream *is, void *ptr, size_t size);
#define APK_SPLICE_ALL 0xffffffff
ssize_t apk_istream_splice(struct apk_istream *is, int fd, size_t size,
apk_progress_cb cb, void *cb_ctx);
static inline struct apk_istream *apk_istream_from_url(const char *url)
{
return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, 0);
}
static inline struct apk_istream *apk_istream_from_fd_url(int atfd, const char *url)
{
return apk_istream_from_fd_url_if_modified(atfd, url, 0);
}
static inline struct apk_istream *apk_istream_from_url_if_modified(const char *url, time_t since)
{
return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, since);
}
static inline void apk_istream_get_meta(struct apk_istream *is, struct apk_file_meta *meta)
{
is->ops->get_meta(is, meta);
}
static inline void apk_istream_close(struct apk_istream *is)
{
is->ops->close(is);
}
#define APK_BSTREAM_SINGLE_READ 0x0001 #define APK_BSTREAM_SINGLE_READ 0x0001
#define APK_BSTREAM_EOF 0x0002 #define APK_BSTREAM_EOF 0x0002
@ -106,41 +146,6 @@ static inline struct apk_istream *apk_bstream_gunzip(struct apk_bstream *bs)
struct apk_ostream *apk_ostream_gzip(struct apk_ostream *); struct apk_ostream *apk_ostream_gzip(struct apk_ostream *);
struct apk_ostream *apk_ostream_counter(off_t *); struct apk_ostream *apk_ostream_counter(off_t *);
struct apk_istream *apk_istream_from_file(int atfd, const char *file);
struct apk_istream *apk_istream_from_file_gz(int atfd, const char *file);
struct apk_istream *apk_istream_from_fd(int fd);
struct apk_istream *apk_istream_from_fd_url_if_modified(int atfd, const char *url, time_t since);
struct apk_istream *apk_istream_from_url_gz(const char *url);
ssize_t apk_istream_skip(struct apk_istream *istream, size_t size);
#define APK_SPLICE_ALL 0xffffffff
ssize_t apk_istream_splice(struct apk_istream *is, int fd, size_t size,
apk_progress_cb cb, void *cb_ctx);
static inline struct apk_istream *apk_istream_from_url(const char *url)
{
return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, 0);
}
static inline struct apk_istream *apk_istream_from_fd_url(int atfd, const char *url)
{
return apk_istream_from_fd_url_if_modified(atfd, url, 0);
}
static inline struct apk_istream *apk_istream_from_url_if_modified(const char *url, time_t since)
{
return apk_istream_from_fd_url_if_modified(AT_FDCWD, url, since);
}
static inline void apk_istream_get_meta(struct apk_istream *is, struct apk_file_meta *meta)
{
is->ops->get_meta(is, meta);
}
static inline ssize_t apk_istream_read(struct apk_istream *is, void *ptr, size_t size)
{
return is->ops->read(is, ptr, size);
}
static inline void apk_istream_close(struct apk_istream *is)
{
is->ops->close(is);
}
struct apk_bstream *apk_bstream_from_istream(struct apk_istream *istream); struct apk_bstream *apk_bstream_from_istream(struct apk_istream *istream);
struct apk_bstream *apk_bstream_from_file(int atfd, const char *file); struct apk_bstream *apk_bstream_from_file(int atfd, const char *file);

View File

@ -303,8 +303,10 @@ int apk_tar_parse(struct apk_istream *is, apk_archive_entry_parser parser,
toskip += 512 - ((offset + toskip) & 511); toskip += 512 - ((offset + toskip) & 511);
offset += toskip; offset += toskip;
if (toskip != 0) { if (toskip != 0) {
if ((r = apk_istream_read(is, NULL, toskip)) != toskip) if ((r = apk_istream_read(is, NULL, toskip)) != toskip) {
r = -EIO;
goto err; goto err;
}
} }
} }

View File

@ -22,7 +22,6 @@ struct apk_gzip_istream {
struct apk_istream is; struct apk_istream is;
struct apk_bstream *bs; struct apk_bstream *bs;
z_stream zs; z_stream zs;
int err;
apk_multipart_cb cb; apk_multipart_cb cb;
void *cbctx; void *cbctx;
@ -40,9 +39,9 @@ static int gzi_boundary_change(struct apk_gzip_istream *gis)
{ {
int r; int r;
r = gis->cb(gis->cbctx, gis->err ? APK_MPART_END : APK_MPART_BOUNDARY, gis->cbarg); r = gis->cb(gis->cbctx, gis->is.err ? APK_MPART_END : APK_MPART_BOUNDARY, gis->cbarg);
if (r > 0) r = -ECANCELED; if (r > 0) r = -ECANCELED;
if (r != 0) gis->err = r; if (r != 0) gis->is.err = r;
return r; return r;
} }
@ -51,19 +50,10 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size)
struct apk_gzip_istream *gis = container_of(is, struct apk_gzip_istream, is); struct apk_gzip_istream *gis = container_of(is, struct apk_gzip_istream, is);
int r; int r;
if (gis->err != 0) {
if (gis->err > 0)
return 0;
return gis->err;
}
if (ptr == NULL)
return apk_istream_skip(&gis->is, size);
gis->zs.avail_out = size; gis->zs.avail_out = size;
gis->zs.next_out = ptr; gis->zs.next_out = ptr;
while (gis->zs.avail_out != 0 && gis->err == 0) { while (gis->zs.avail_out != 0 && gis->is.err == 0) {
if (!APK_BLOB_IS_NULL(gis->cbarg)) { if (!APK_BLOB_IS_NULL(gis->cbarg)) {
if (gzi_boundary_change(gis)) if (gzi_boundary_change(gis))
goto ret; goto ret;
@ -83,10 +73,10 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size)
gis->zs.avail_in = blob.len; gis->zs.avail_in = blob.len;
gis->zs.next_in = (void *) gis->cbprev; gis->zs.next_in = (void *) gis->cbprev;
if (blob.len < 0) { if (blob.len < 0) {
gis->err = blob.len; gis->is.err = blob.len;
goto ret; goto ret;
} else if (gis->zs.avail_in == 0) { } else if (gis->zs.avail_in == 0) {
gis->err = 1; gis->is.err = 1;
gis->cbarg = APK_BLOB_NULL; gis->cbarg = APK_BLOB_NULL;
gzi_boundary_change(gis); gzi_boundary_change(gis);
goto ret; goto ret;
@ -99,7 +89,7 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size)
/* Digest the inflated bytes */ /* Digest the inflated bytes */
if ((gis->bs->flags & APK_BSTREAM_EOF) && if ((gis->bs->flags & APK_BSTREAM_EOF) &&
gis->zs.avail_in == 0) gis->zs.avail_in == 0)
gis->err = 1; gis->is.err = 1;
if (gis->cb != NULL) { if (gis->cb != NULL) {
gis->cbarg = APK_BLOB_PTR_LEN(gis->cbprev, (void *) gis->zs.next_in - gis->cbprev); gis->cbarg = APK_BLOB_PTR_LEN(gis->cbprev, (void *) gis->zs.next_in - gis->cbprev);
gis->cbprev = gis->zs.next_in; gis->cbprev = gis->zs.next_in;
@ -109,26 +99,24 @@ static ssize_t gzi_read(struct apk_istream *is, void *ptr, size_t size)
* callback here, as we won't be called again. * callback here, as we won't be called again.
* For boundaries it should be postponed to not * For boundaries it should be postponed to not
* be called until next gzip read is started. */ * be called until next gzip read is started. */
if (gis->err) { if (gis->is.err) {
gzi_boundary_change(gis); gzi_boundary_change(gis);
goto ret; goto ret;
} }
inflateEnd(&gis->zs); inflateEnd(&gis->zs);
if (inflateInit2(&gis->zs, 15+32) != Z_OK) if (inflateInit2(&gis->zs, 15+32) != Z_OK)
return -ENOMEM; return -ENOMEM;
if (gis->cb) goto ret;
break; break;
case Z_OK: case Z_OK:
break; break;
default: default:
gis->err = -EIO; gis->is.err = -EIO;
break; break;
} }
} }
ret: ret:
if (size - gis->zs.avail_out == 0)
return gis->err < 0 ? gis->err : 0;
return size - gis->zs.avail_out; return size - gis->zs.avail_out;
} }
@ -154,11 +142,13 @@ struct apk_istream *apk_bstream_gunzip_mpart(struct apk_bstream *bs,
if (IS_ERR_OR_NULL(bs)) return ERR_CAST(bs); if (IS_ERR_OR_NULL(bs)) return ERR_CAST(bs);
gis = malloc(sizeof(struct apk_gzip_istream)); gis = malloc(sizeof(*gis) + apk_io_bufsize);
if (!gis) goto err; if (!gis) goto err;
*gis = (struct apk_gzip_istream) { *gis = (struct apk_gzip_istream) {
.is.ops = &gunzip_istream_ops, .is.ops = &gunzip_istream_ops,
.is.buf = (uint8_t*)(gis + 1),
.is.buf_size = apk_io_bufsize,
.bs = bs, .bs = bs,
.cb = cb, .cb = cb,
.cbctx = ctx, .cbctx = ctx,

View File

@ -35,6 +35,8 @@
#define HAVE_FGETGRENT_R #define HAVE_FGETGRENT_R
#endif #endif
size_t apk_io_bufsize = 2*1024;
static void apk_file_meta_from_fd(int fd, struct apk_file_meta *meta) static void apk_file_meta_from_fd(int fd, struct apk_file_meta *meta)
{ {
struct stat st; struct stat st;
@ -56,6 +58,44 @@ void apk_file_meta_to_fd(int fd, struct apk_file_meta *meta)
futimens(fd, times); futimens(fd, times);
} }
ssize_t apk_istream_read(struct apk_istream *is, void *ptr, size_t size)
{
ssize_t left = size, r = 0;
while (left) {
if (is->ptr != is->end) {
r = MIN(left, is->end - is->ptr);
if (ptr) {
memcpy(ptr, is->ptr, r);
ptr += r;
}
is->ptr += r;
left -= r;
continue;
}
if (is->err) break;
if (ptr && left > is->buf_size/4) {
r = is->ops->read(is, ptr, left);
if (r <= 0) break;
left -= r;
ptr += r;
continue;
}
r = is->ops->read(is, is->buf, is->buf_size);
if (r <= 0) break;
is->ptr = is->buf;
is->end = is->buf + r;
}
if (r < 0) return r;
if (size && left == size && !is->err) is->err = 1;
if (size == left) return is->err < 0 ? is->err : 0;
return size - left;
}
struct apk_fd_istream { struct apk_fd_istream {
struct apk_istream is; struct apk_istream is;
int fd; int fd;
@ -70,24 +110,11 @@ static void fdi_get_meta(struct apk_istream *is, struct apk_file_meta *meta)
static ssize_t fdi_read(struct apk_istream *is, void *ptr, size_t size) static ssize_t fdi_read(struct apk_istream *is, void *ptr, size_t size)
{ {
struct apk_fd_istream *fis = container_of(is, struct apk_fd_istream, is); struct apk_fd_istream *fis = container_of(is, struct apk_fd_istream, is);
ssize_t i = 0, r; ssize_t r;
if (ptr == NULL) { r = read(fis->fd, ptr, size);
if (lseek(fis->fd, size, SEEK_CUR) < 0) if (r < 0) return -errno;
return -errno; return r;
return size;
}
while (i < size) {
r = read(fis->fd, ptr + i, size - i);
if (r < 0)
return -errno;
if (r == 0)
break;
i += r;
}
return i;
} }
static void fdi_close(struct apk_istream *is) static void fdi_close(struct apk_istream *is)
@ -110,7 +137,7 @@ struct apk_istream *apk_istream_from_fd(int fd)
if (fd < 0) return ERR_PTR(-EBADF); if (fd < 0) return ERR_PTR(-EBADF);
fis = malloc(sizeof(struct apk_fd_istream)); fis = malloc(sizeof(*fis) + apk_io_bufsize);
if (fis == NULL) { if (fis == NULL) {
close(fd); close(fd);
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
@ -118,6 +145,8 @@ struct apk_istream *apk_istream_from_fd(int fd)
*fis = (struct apk_fd_istream) { *fis = (struct apk_fd_istream) {
.is.ops = &fd_istream_ops, .is.ops = &fd_istream_ops,
.is.buf = (uint8_t *)(fis + 1),
.is.buf_size = apk_io_bufsize,
.fd = fd, .fd = fd,
}; };
@ -134,21 +163,6 @@ struct apk_istream *apk_istream_from_file(int atfd, const char *file)
return apk_istream_from_fd(fd); return apk_istream_from_fd(fd);
} }
ssize_t apk_istream_skip(struct apk_istream *is, size_t size)
{
unsigned char buf[2048];
size_t done = 0, togo;
ssize_t r;
while (done < size) {
togo = MIN(size - done, sizeof buf);
r = apk_istream_read(is, buf, togo);
if (r <= 0) return r ?: -EIO;
done += r;
}
return done;
}
ssize_t apk_istream_splice(struct apk_istream *is, int fd, size_t size, ssize_t apk_istream_splice(struct apk_istream *is, int fd, size_t size,
apk_progress_cb cb, void *cb_ctx) apk_progress_cb cb, void *cb_ctx)
{ {

View File

@ -80,18 +80,11 @@ static void fetch_get_meta(struct apk_istream *is, struct apk_file_meta *meta)
static ssize_t fetch_read(struct apk_istream *is, void *ptr, size_t size) static ssize_t fetch_read(struct apk_istream *is, void *ptr, size_t size)
{ {
struct apk_fetch_istream *fis = container_of(is, struct apk_fetch_istream, is); struct apk_fetch_istream *fis = container_of(is, struct apk_fetch_istream, is);
ssize_t i = 0, r; ssize_t r;
if (ptr == NULL) return apk_istream_skip(&fis->is, size); r = fetchIO_read(fis->fetchIO, ptr, size);
if (r < 0) return -EIO;
while (i < size) { return r;
r = fetchIO_read(fis->fetchIO, ptr + i, size - i);
if (r < 0) return -EIO;
if (r == 0) break;
i += r;
}
return i;
} }
static void fetch_close(struct apk_istream *is) static void fetch_close(struct apk_istream *is)
@ -120,7 +113,7 @@ static struct apk_istream *apk_istream_fetch(const char *url, time_t since)
rc = -EAPKBADURL; rc = -EAPKBADURL;
goto err; goto err;
} }
fis = malloc(sizeof(*fis)); fis = malloc(sizeof *fis + apk_io_bufsize);
if (!fis) { if (!fis) {
rc = -ENOMEM; rc = -ENOMEM;
goto err; goto err;
@ -135,6 +128,8 @@ static struct apk_istream *apk_istream_fetch(const char *url, time_t since)
*fis = (struct apk_fetch_istream) { *fis = (struct apk_fetch_istream) {
.is.ops = &fetch_istream_ops, .is.ops = &fetch_istream_ops,
.is.buf = (uint8_t*)(fis+1),
.is.buf_size = apk_io_bufsize,
.fetchIO = io, .fetchIO = io,
.urlstat = fis->urlstat, .urlstat = fis->urlstat,
}; };