| /////////////////////////////////////////////////////////////////////////////// |
| // |
| /// \file outqueue.c |
| /// \brief Output queue handling in multithreaded coding |
| // |
| // Author: Lasse Collin |
| // |
| // This file has been put into the public domain. |
| // You can do whatever you want with this file. |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| #include "outqueue.h" |
| |
| |
| /// Get the maximum number of buffers that may be allocated based |
| /// on the number of threads. For now this is twice the number of threads. |
| /// It's a compromise between RAM usage and keeping the worker threads busy |
| /// when buffers finish out of order. |
| #define GET_BUFS_LIMIT(threads) (2 * (threads)) |
| |
| |
| extern uint64_t |
| lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) |
| { |
| // This is to ease integer overflow checking: We may allocate up to |
| // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra |
| // memory for other data structures too (that's the /2). |
| // |
| // lzma_outq_prealloc_buf() will still accept bigger buffers than this. |
| const uint64_t limit |
| = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2; |
| |
| if (threads > LZMA_THREADS_MAX || buf_size_max > limit) |
| return UINT64_MAX; |
| |
| return GET_BUFS_LIMIT(threads) |
| * lzma_outq_outbuf_memusage(buf_size_max); |
| } |
| |
| |
| static void |
| move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator) |
| { |
| assert(outq->head != NULL); |
| assert(outq->tail != NULL); |
| assert(outq->bufs_in_use > 0); |
| |
| lzma_outbuf *buf = outq->head; |
| outq->head = buf->next; |
| if (outq->head == NULL) |
| outq->tail = NULL; |
| |
| if (outq->cache != NULL && outq->cache->allocated != buf->allocated) |
| lzma_outq_clear_cache(outq, allocator); |
| |
| buf->next = outq->cache; |
| outq->cache = buf; |
| |
| --outq->bufs_in_use; |
| outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated); |
| |
| return; |
| } |
| |
| |
| static void |
| free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator) |
| { |
| assert(outq->cache != NULL); |
| |
| lzma_outbuf *buf = outq->cache; |
| outq->cache = buf->next; |
| |
| --outq->bufs_allocated; |
| outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated); |
| |
| lzma_free(buf, allocator); |
| return; |
| } |
| |
| |
| extern void |
| lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator) |
| { |
| while (outq->cache != NULL) |
| free_one_cached_buffer(outq, allocator); |
| |
| return; |
| } |
| |
| |
| extern void |
| lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator, |
| size_t keep_size) |
| { |
| if (outq->cache == NULL) |
| return; |
| |
| // Free all but one. |
| while (outq->cache->next != NULL) |
| free_one_cached_buffer(outq, allocator); |
| |
| // Free the last one only if its size doesn't equal to keep_size. |
| if (outq->cache->allocated != keep_size) |
| free_one_cached_buffer(outq, allocator); |
| |
| return; |
| } |
| |
| |
| extern lzma_ret |
| lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, |
| uint32_t threads) |
| { |
| if (threads > LZMA_THREADS_MAX) |
| return LZMA_OPTIONS_ERROR; |
| |
| const uint32_t bufs_limit = GET_BUFS_LIMIT(threads); |
| |
| // Clear head/tail. |
| while (outq->head != NULL) |
| move_head_to_cache(outq, allocator); |
| |
| // If new buf_limit is lower than the old one, we may need to free |
| // a few cached buffers. |
| while (bufs_limit < outq->bufs_allocated) |
| free_one_cached_buffer(outq, allocator); |
| |
| outq->bufs_limit = bufs_limit; |
| outq->read_pos = 0; |
| |
| return LZMA_OK; |
| } |
| |
| |
| extern void |
| lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) |
| { |
| while (outq->head != NULL) |
| move_head_to_cache(outq, allocator); |
| |
| lzma_outq_clear_cache(outq, allocator); |
| return; |
| } |
| |
| |
| extern lzma_ret |
| lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator, |
| size_t size) |
| { |
| // Caller must have checked it with lzma_outq_has_buf(). |
| assert(outq->bufs_in_use < outq->bufs_limit); |
| |
| // If there already is appropriately-sized buffer in the cache, |
| // we need to do nothing. |
| if (outq->cache != NULL && outq->cache->allocated == size) |
| return LZMA_OK; |
| |
| if (size > SIZE_MAX - sizeof(lzma_outbuf)) |
| return LZMA_MEM_ERROR; |
| |
| const size_t alloc_size = lzma_outq_outbuf_memusage(size); |
| |
| // The cache may have buffers but their size is wrong. |
| lzma_outq_clear_cache(outq, allocator); |
| |
| outq->cache = lzma_alloc(alloc_size, allocator); |
| if (outq->cache == NULL) |
| return LZMA_MEM_ERROR; |
| |
| outq->cache->next = NULL; |
| outq->cache->allocated = size; |
| |
| ++outq->bufs_allocated; |
| outq->mem_allocated += alloc_size; |
| |
| return LZMA_OK; |
| } |
| |
| |
| extern lzma_outbuf * |
| lzma_outq_get_buf(lzma_outq *outq, void *worker) |
| { |
| // Caller must have used lzma_outq_prealloc_buf() to ensure these. |
| assert(outq->bufs_in_use < outq->bufs_limit); |
| assert(outq->bufs_in_use < outq->bufs_allocated); |
| assert(outq->cache != NULL); |
| |
| lzma_outbuf *buf = outq->cache; |
| outq->cache = buf->next; |
| buf->next = NULL; |
| |
| if (outq->tail != NULL) { |
| assert(outq->head != NULL); |
| outq->tail->next = buf; |
| } else { |
| assert(outq->head == NULL); |
| outq->head = buf; |
| } |
| |
| outq->tail = buf; |
| |
| buf->worker = worker; |
| buf->finished = false; |
| buf->finish_ret = LZMA_STREAM_END; |
| buf->pos = 0; |
| buf->decoder_in_pos = 0; |
| |
| buf->unpadded_size = 0; |
| buf->uncompressed_size = 0; |
| |
| ++outq->bufs_in_use; |
| outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated); |
| |
| return buf; |
| } |
| |
| |
| extern bool |
| lzma_outq_is_readable(const lzma_outq *outq) |
| { |
| if (outq->head == NULL) |
| return false; |
| |
| return outq->read_pos < outq->head->pos || outq->head->finished; |
| } |
| |
| |
| extern lzma_ret |
| lzma_outq_read(lzma_outq *restrict outq, |
| const lzma_allocator *restrict allocator, |
| uint8_t *restrict out, size_t *restrict out_pos, |
| size_t out_size, |
| lzma_vli *restrict unpadded_size, |
| lzma_vli *restrict uncompressed_size) |
| { |
| // There must be at least one buffer from which to read. |
| if (outq->bufs_in_use == 0) |
| return LZMA_OK; |
| |
| // Get the buffer. |
| lzma_outbuf *buf = outq->head; |
| |
| // Copy from the buffer to output. |
| // |
| // FIXME? In threaded decoder it may be bad to do this copy while |
| // the mutex is being held. |
| lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos, |
| out, out_pos, out_size); |
| |
| // Return if we didn't get all the data from the buffer. |
| if (!buf->finished || outq->read_pos < buf->pos) |
| return LZMA_OK; |
| |
| // The buffer was finished. Tell the caller its size information. |
| if (unpadded_size != NULL) |
| *unpadded_size = buf->unpadded_size; |
| |
| if (uncompressed_size != NULL) |
| *uncompressed_size = buf->uncompressed_size; |
| |
| // Remember the return value. |
| const lzma_ret finish_ret = buf->finish_ret; |
| |
| // Free this buffer for further use. |
| move_head_to_cache(outq, allocator); |
| outq->read_pos = 0; |
| |
| return finish_ret; |
| } |
| |
| |
| extern void |
| lzma_outq_enable_partial_output(lzma_outq *outq, |
| void (*enable_partial_output)(void *worker)) |
| { |
| if (outq->head != NULL && !outq->head->finished |
| && outq->head->worker != NULL) { |
| enable_partial_output(outq->head->worker); |
| |
| // Set it to NULL since calling it twice is pointless. |
| outq->head->worker = NULL; |
| } |
| |
| return; |
| } |