From 1239780f60cb4acb297639335ab193fb1835b7cf Mon Sep 17 00:00:00 2001 From: Marcin Szkudlinski Date: Wed, 16 Aug 2023 08:32:28 +0200 Subject: [PATCH] dp: introduce dp_queue DP queue is a circular buffer providing safe consumer/producer cached operations cross cores Both data consumer and data producer declare max chunk sizes they want to use (IBS/OBS) The queue may work in 2 modes 1) simple mode in case both receiver and sender are located on the same core and cache coherency does not matter. dp_queue structure is located in cached memory In this case DP Queue is a simple ring buffer Buffer size must be: - 2*MAX(IBS,OBS) if IBS(obs) is a multiplication of OBS(IBS) - 3*MAX(IBS,OBS) otherwise 2) shared mode In this case we need to writeback cache when new data arrive and invalidate cache on secondary core. That means the whole cacheline must be used exclusively by sink or by source. Incoming data will be available to use when a cacheline is filled completely dp_queue structure is located in shared memory buffer size is always 3*MAX(IBS,OBS,CACHELINE) + CACHELINE Signed-off-by: Marcin Szkudlinski --- src/audio/dp_queue.c | 391 +++++++++++++++++++++++++++++++ src/include/sof/audio/dp_queue.h | 87 +++++++ zephyr/CMakeLists.txt | 3 + 3 files changed, 481 insertions(+) create mode 100644 src/audio/dp_queue.c create mode 100644 src/include/sof/audio/dp_queue.h diff --git a/src/audio/dp_queue.c b/src/audio/dp_queue.c new file mode 100644 index 000000000000..7a0344692ee2 --- /dev/null +++ b/src/audio/dp_queue.c @@ -0,0 +1,391 @@ +// SPDX-License-Identifier: BSD-3-Clause +// +// Copyright(c) 2021 Intel Corporation. All rights reserved. +// + +#include +#include +#include +#include + +#include +#include + +struct dp_queue { + struct sof_source source_api; /**< src api handler */ + struct sof_sink sink_api; /**< sink api handler */ + + uint32_t ibs; + uint32_t obs; + uint32_t flags; + + size_t data_buffer_size; + uint8_t __sparse_cache *data_buffer; + + uint32_t write_offset; + uint32_t read_offset; + uint32_t available_data; /* amount of data ready for immediate reading */ + /* free buffer space + * NOTE! + * - when dp queue is shared, available_data + free_space DOES NOT eq data_buffer_size + * - when dp queue is not shared available_data + free_space always == data_buffer_size + */ + uint32_t free_space; + + bool hw_params_configured; + struct sof_audio_stream_params audio_stream_params; + + struct k_spinlock lock; +}; + +union lock_key { + k_spinlock_key_t spinlock; + int irq_lock; +}; + +static inline bool dp_queue_is_shared(struct dp_queue *dp_queue) +{ + return !!(dp_queue->flags & DP_QUEUE_MODE_SHARED); +} + +static inline uint8_t __sparse_cache *dp_queue_buffer_end(struct dp_queue *dp_queue) +{ + return dp_queue->data_buffer + dp_queue->data_buffer_size; +} + +static inline struct dp_queue *dp_queue_get_from_sink(struct sof_sink *sink) +{ + return container_of(sink, struct dp_queue, sink_api); +} + +static inline struct dp_queue *dp_queue_get_from_source(struct sof_source *source) +{ + return container_of(source, struct dp_queue, source_api); +} + +static inline void dp_queue_invalidate_shared(struct dp_queue *dp_queue, + void __sparse_cache *ptr, uint32_t size) +{ + /* wrap-around? */ + if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) { + /* writeback till the end of circular buffer */ + dcache_invalidate_region + (ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr); + size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr; + ptr = (__sparse_force void __sparse_cache *)dp_queue->data_buffer; + } + /* invalidate rest of data */ + dcache_invalidate_region(ptr, size); +} + +static inline union lock_key dp_queue_lock(struct dp_queue *dp_queue) +{ + /* use cross-core spinlock in case of shared queue + * When shared, dp_queue structure is located in not cached memory + * as required by spinlock + */ + union lock_key key; + + if (dp_queue_is_shared(dp_queue)) + key.spinlock = k_spin_lock(&dp_queue->lock); + else + /* use faster irq_lock in case of not shared queue (located in cached mem) */ + key.irq_lock = irq_lock(); + + return key; +} + +static inline void dp_queue_unlock(struct dp_queue *dp_queue, union lock_key key) +{ + if (dp_queue_is_shared(dp_queue)) + k_spin_unlock(&dp_queue->lock, key.spinlock); + else + irq_unlock(key.irq_lock); +} + +struct sof_sink *dp_queue_get_sink(struct dp_queue *dp_queue) +{ + return &dp_queue->sink_api; +} + +struct sof_source *dp_queue_get_source(struct dp_queue *dp_queue) +{ + return &dp_queue->source_api; +} + +static size_t dp_queue_get_free_size(struct sof_sink *sink) +{ + struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); + + return dp_queue->free_space; +} + +static int dp_queue_get_buffer(struct sof_sink *sink, size_t req_size, + void **data_ptr, void **buffer_start, size_t *buffer_size) +{ + struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); + + /* dp_queue_get_free_size will return free size with adjustment for cacheline if needed */ + if (req_size > dp_queue_get_free_size(sink)) + return -ENODATA; + + /* no need to lock, just reading data that may be modified by commit_buffer only + * + * note! a sparse warning will be generated here till + * https://github.com/thesofproject/sof/issues/8006 is implemented + */ + *data_ptr = dp_queue->data_buffer + dp_queue->write_offset; + *buffer_start = dp_queue->data_buffer; + *buffer_size = dp_queue->data_buffer_size; + + /* provided buffer is an empty space, the requester will perform write operations only + * no need to invalidate cache - will be overwritten anyway + */ + return 0; +} + +static int dp_queue_commit_buffer(struct sof_sink *sink, size_t commit_size) +{ + struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); + + if (commit_size) { + union lock_key key = dp_queue_lock(dp_queue); + + if (dp_queue_is_shared(dp_queue)) { + /* a shared queue. We need to go through committed cachelines one-by-one + * and if the whole cacheline is committed - writeback cache + * and mark data as available for reading + * + * first, calculate the current and last committed cacheline + * as offsets from buffer start + */ + uint32_t current_cacheline = dp_queue->write_offset / + PLATFORM_DCACHE_ALIGN; + + /* Last used cacheline may not be filled completely, calculate cacheline + * containing 1st free byte + */ + uint32_t last_cacheline = (dp_queue->write_offset + commit_size + 1) / + PLATFORM_DCACHE_ALIGN; + uint32_t total_num_of_cachelines = dp_queue->data_buffer_size / + PLATFORM_DCACHE_ALIGN; + /* wrap-around? */ + last_cacheline %= total_num_of_cachelines; + + /* now go one by one. + * if current_cacheline == last_full_cacheline - nothing to do + */ + while (current_cacheline != last_cacheline) { + /* writeback / invalidate */ + uint8_t __sparse_cache *ptr = dp_queue->data_buffer + + (current_cacheline * PLATFORM_DCACHE_ALIGN); + dcache_writeback_region(ptr, PLATFORM_DCACHE_ALIGN); + + /* mark data as available to read */ + dp_queue->available_data += PLATFORM_DCACHE_ALIGN; + /* get next cacheline */ + current_cacheline = (current_cacheline + 1) % + total_num_of_cachelines; + } + } else { + /* not shared */ + dp_queue->available_data += commit_size; + } + + /* move write pointer */ + dp_queue->free_space -= commit_size; + dp_queue->write_offset = (dp_queue->write_offset + commit_size) % + dp_queue->data_buffer_size; + + dp_queue_unlock(dp_queue, key); + } + + return 0; +} + +static size_t dp_queue_get_data_available(struct sof_source *source) +{ + struct dp_queue *dp_queue = dp_queue_get_from_source(source); + + /* access is read only, using uncached alias, no need to lock */ + return dp_queue->available_data; +} + +static int dp_queue_get_data(struct sof_source *source, size_t req_size, + void **data_ptr, void **buffer_start, size_t *buffer_size) +{ + struct dp_queue *dp_queue = dp_queue_get_from_source(source); + + /* no need to lock, just reading data */ + if (req_size > dp_queue_get_data_available(source)) + return -ENODATA; + + /* + * note! a sparse warning will be generated here till + * https://github.com/thesofproject/sof/issues/8006 is implemented + */ + *buffer_start = dp_queue->data_buffer; + *buffer_size = dp_queue->data_buffer_size; + *data_ptr = dp_queue->data_buffer + dp_queue->read_offset; + + /* clean cache in provided data range */ + if (dp_queue_is_shared(dp_queue)) + dp_queue_invalidate_shared(dp_queue, + (__sparse_force void __sparse_cache *)*data_ptr, + req_size); + + return 0; +} + +static int dp_queue_release_data(struct sof_source *source, size_t free_size) +{ + struct dp_queue *dp_queue = dp_queue_get_from_source(source); + + if (free_size) { + /* data consumed, free buffer space, no need for any special cache operations */ + union lock_key key = dp_queue_lock(dp_queue); + + dp_queue->available_data -= free_size; + dp_queue->free_space += free_size; + dp_queue->read_offset = + (dp_queue->read_offset + free_size) % dp_queue->data_buffer_size; + + dp_queue_unlock(dp_queue, key); + } + + return 0; +} + +static int dp_queue_set_ipc_params(struct dp_queue *dp_queue, + struct sof_ipc_stream_params *params, + bool force_update) +{ + if (dp_queue->hw_params_configured && !force_update) + return 0; + + dp_queue->audio_stream_params.frame_fmt = params->frame_fmt; + dp_queue->audio_stream_params.rate = params->rate; + dp_queue->audio_stream_params.channels = params->channels; + dp_queue->audio_stream_params.buffer_fmt = params->buffer_fmt; + + dp_queue->hw_params_configured = true; + + return 0; +} + +/* + * note! a sparse warning will be generated here till + * https://github.com/thesofproject/sof/issues/8006 is implemented + */ + +static int dp_queue_set_ipc_params_source(struct sof_source *source, + struct sof_ipc_stream_params *params, + bool force_update) +{ + struct dp_queue *dp_queue = dp_queue_get_from_source(source); + + return dp_queue_set_ipc_params(dp_queue, params, force_update); +} + +static int dp_queue_set_ipc_params_sink(struct sof_sink *sink, + struct sof_ipc_stream_params *params, + bool force_update) +{ + struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); + + return dp_queue_set_ipc_params(dp_queue, params, force_update); +} + +/* + * note! a sparse warning will be generated here till + * https://github.com/thesofproject/sof/issues/8006 is implemented + */ +static const struct source_ops dp_queue_source_ops = { + .get_data_available = dp_queue_get_data_available, + .get_data = dp_queue_get_data, + .release_data = dp_queue_release_data, + .audio_set_ipc_params = dp_queue_set_ipc_params_source, +}; + +static const struct sink_ops dp_queue_sink_ops = { + .get_free_size = dp_queue_get_free_size, + .get_buffer = dp_queue_get_buffer, + .commit_buffer = dp_queue_commit_buffer, + .audio_set_ipc_params = dp_queue_set_ipc_params_sink, +}; + +struct dp_queue *dp_queue_create(uint32_t ibs, uint32_t obs, uint32_t flags) +{ + uint32_t align; + struct dp_queue *dp_queue; + + /* allocate DP structure */ + if (flags & DP_QUEUE_MODE_SHARED) + dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME_SHARED, 0, SOF_MEM_CAPS_RAM, + sizeof(*dp_queue)); + else + dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*dp_queue)); + if (!dp_queue) + return NULL; + + dp_queue->flags = flags; + + /* initiate sink/source provide uncached pointers to audio_stream_params + * + * note! a sparse warning will be generated here till + * https://github.com/thesofproject/sof/issues/8006 is implemented + */ + source_init(dp_queue_get_source(dp_queue), &dp_queue_source_ops, + &dp_queue->audio_stream_params); + sink_init(dp_queue_get_sink(dp_queue), &dp_queue_sink_ops, + &dp_queue->audio_stream_params); + + /* set original ibs/obs as required by the module */ + sink_set_min_free(&dp_queue->sink_api, ibs); + source_set_min_available(&dp_queue->source_api, obs); + + /* calculate required buffer size */ + if (dp_queue_is_shared(dp_queue)) { + ibs = MAX(ibs, PLATFORM_DCACHE_ALIGN); + obs = MAX(obs, PLATFORM_DCACHE_ALIGN); + align = PLATFORM_DCACHE_ALIGN; + dp_queue->data_buffer_size = 3 * MAX(ibs, obs) + PLATFORM_DCACHE_ALIGN; + /* reserve 1 cacheline to ensure exclusive use */ + dp_queue->free_space = dp_queue->data_buffer_size - PLATFORM_DCACHE_ALIGN; + } else { + if ((ibs % obs == 0) && (obs % ibs == 0)) + dp_queue->data_buffer_size = 2 * MAX(ibs, obs); + else + dp_queue->data_buffer_size = 3 * MAX(ibs, obs); + align = 0; + dp_queue->free_space = dp_queue->data_buffer_size; + } + dp_queue->available_data = 0; + + /* allocate data buffer - always in cached memory */ + dp_queue->data_buffer = (__sparse_force __sparse_cache void *) + rballoc_align(0, 0, dp_queue->data_buffer_size, align); + if (!dp_queue->data_buffer) + goto err; + + /* return allocated structure */ + return dp_queue; +err: + rfree(dp_queue); + return NULL; +} + +void dp_queue_free(struct dp_queue *dp_queue) +{ + dp_queue_invalidate_shared(dp_queue, + (__sparse_force void __sparse_cache *)dp_queue->data_buffer, + dp_queue->data_buffer_size); + + rfree((__sparse_force void *)dp_queue->data_buffer); + rfree(dp_queue); +} + +struct sof_audio_stream_params *dp_queue_get_audio_params(struct dp_queue *dp_queue) +{ + return &dp_queue->audio_stream_params; +} diff --git a/src/include/sof/audio/dp_queue.h b/src/include/sof/audio/dp_queue.h new file mode 100644 index 000000000000..5f29df952f45 --- /dev/null +++ b/src/include/sof/audio/dp_queue.h @@ -0,0 +1,87 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright(c) 2023 Intel Corporation. All rights reserved. + * + */ + +#ifndef __SOF_DP_QUEUE_H__ +#define __SOF_DP_QUEUE_H__ + +#include +#include +#include + +/** + * DP queue is a circular buffer providing safe consumer/producer cached operations cross cores + * + * prerequisites: + * 1) incoming and outgoing data rate MUST be the same + * 2) Both data consumer and data producer declare max chunk sizes they want to use (IBS/OBS) + * + * The queue may work in 2 modes + * 1) simple mode + * in case both receiver and sender are located on the same core and cache coherency + * does not matter. dp_queue structure is located in cached memory + * In this case DP Queue is a simple ring buffer + * Buffer size must be: + * - 2*MAX(IBS,OBS) if IBS(obs) is a multiplication of OBS(IBS) + * - 3*MAX(IBS,OBS) otherwise + * + * 2) shared mode + * In this case we need to writeback cache when new data arrive and invalidate cache on + * secondary core. That means the whole cacheline must be used exclusively by sink or by source + * incoming data will be available to use when a cacheline is filled completely + * dp_queue structure is located in shared memory + * + * buffer size is always 3*MAX(IBS,OBS,CACHELINE) + CACHELINE + * + */ + +struct dp_queue; +struct sof_audio_stream_params; + +#define DP_QUEUE_MODE_SIMPLE 0 +#define DP_QUEUE_MODE_SHARED BIT(1) + +/** + * @param ibs input buffer size + * the size of data to be produced in 1 cycle + * the data producer declares here how much data it will produce in single cycle + * + * @param obs output buffer size + * the size of data to be consumed in 1 cycle + * the data receiver declares here how much data it will consume in single cycle + * + * @param flags a combinatin of DP_QUEUE_MODE_* flags determining working mode + * + */ +struct dp_queue *dp_queue_create(uint32_t ibs, uint32_t obs, uint32_t flags); +/** + * @brief free dp queue memory + */ +void dp_queue_free(struct dp_queue *dp_queue); + +/** + * @brief return a handler to sink API of dp_queue. + * the handler may be used by helper functions defined in sink_api.h + * note! a sparse warning will be generated here till + * https://github.com/thesofproject/sof/issues/8006 is implemented + */ +struct sof_sink *dp_queue_get_sink(struct dp_queue *dp_queue); + +/** + * @brief return a handler to source API of dp_queue + * the handler may be used by helper functions defined in source_api.h + * note! a sparse warning will be generated here till + * https://github.com/thesofproject/sof/issues/8006 is implemented + */ +struct sof_source *dp_queue_get_source(struct dp_queue *dp_queue); + +/** + * @brief this is a backdoor to get complete audio params structure from dp_queue + * it is needed till pipeline 2.0 is ready + * + */ +struct sof_audio_stream_params *dp_queue_get_audio_params(struct dp_queue *dp_queue); + +#endif /* __SOF_DP_QUEUE_H__ */ diff --git a/zephyr/CMakeLists.txt b/zephyr/CMakeLists.txt index 07558fa1f178..1d3c5c20cd18 100644 --- a/zephyr/CMakeLists.txt +++ b/zephyr/CMakeLists.txt @@ -425,6 +425,9 @@ zephyr_library_sources( lib.c ) +if(CONFIG_ZEPHYR_DP_SCHEDULER) + zephyr_library_sources(${SOF_AUDIO_PATH}/dp_queue.c) +endif() if(CONFIG_SCHEDULE_DMA_SINGLE_CHANNEL AND NOT(CONFIG_DMA_DOMAIN)) zephyr_library_sources(${SOF_SRC_PATH}/schedule/dma_single_chan_domain.c) endif()