-
Notifications
You must be signed in to change notification settings - Fork 321
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DP queue is a circular buffer providing safe consumer/producer cached operations cross cores Both data consumer and data producer declare max chunk sizes they want to use (IBS/OBS) The queue may work in 2 modes 1) simple mode in case both receiver and sender are located on the same core and cache coherency does not matter. dp_queue structure is located in cached memory In this case DP Queue is a simple ring buffer Buffer size must be: - 2*MAX(IBS,OBS) if IBS(obs) is a multiplication of OBS(IBS) - 3*MAX(IBS,OBS) otherwise 2) shared mode In this case we need to writeback cache when new data arrive and invalidate cache on secondary core. That means the whole cacheline must be used exclusively by sink or by source. Incoming data will be available to use when a cacheline is filled completely dp_queue structure is located in shared memory buffer size is always 3*MAX(IBS,OBS,CACHELINE) + CACHELINE Signed-off-by: Marcin Szkudlinski <[email protected]>
- Loading branch information
1 parent
17cb6d9
commit 1239780
Showing
3 changed files
with
481 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,391 @@ | ||
// SPDX-License-Identifier: BSD-3-Clause | ||
// | ||
// Copyright(c) 2021 Intel Corporation. All rights reserved. | ||
// | ||
|
||
#include <sof/audio/dp_queue.h> | ||
#include <sof/audio/sink_api_implementation.h> | ||
#include <sof/audio/source_api_implementation.h> | ||
#include <sof/audio/audio_stream.h> | ||
|
||
#include <rtos/alloc.h> | ||
#include <ipc/topology.h> | ||
|
||
struct dp_queue { | ||
struct sof_source source_api; /**< src api handler */ | ||
struct sof_sink sink_api; /**< sink api handler */ | ||
|
||
uint32_t ibs; | ||
uint32_t obs; | ||
uint32_t flags; | ||
|
||
size_t data_buffer_size; | ||
uint8_t __sparse_cache *data_buffer; | ||
|
||
uint32_t write_offset; | ||
uint32_t read_offset; | ||
uint32_t available_data; /* amount of data ready for immediate reading */ | ||
/* free buffer space | ||
* NOTE! | ||
* - when dp queue is shared, available_data + free_space DOES NOT eq data_buffer_size | ||
* - when dp queue is not shared available_data + free_space always == data_buffer_size | ||
*/ | ||
uint32_t free_space; | ||
|
||
bool hw_params_configured; | ||
struct sof_audio_stream_params audio_stream_params; | ||
|
||
struct k_spinlock lock; | ||
}; | ||
|
||
union lock_key { | ||
k_spinlock_key_t spinlock; | ||
int irq_lock; | ||
}; | ||
|
||
static inline bool dp_queue_is_shared(struct dp_queue *dp_queue) | ||
{ | ||
return !!(dp_queue->flags & DP_QUEUE_MODE_SHARED); | ||
} | ||
|
||
static inline uint8_t __sparse_cache *dp_queue_buffer_end(struct dp_queue *dp_queue) | ||
{ | ||
return dp_queue->data_buffer + dp_queue->data_buffer_size; | ||
} | ||
|
||
static inline struct dp_queue *dp_queue_get_from_sink(struct sof_sink *sink) | ||
{ | ||
return container_of(sink, struct dp_queue, sink_api); | ||
} | ||
|
||
static inline struct dp_queue *dp_queue_get_from_source(struct sof_source *source) | ||
{ | ||
return container_of(source, struct dp_queue, source_api); | ||
} | ||
|
||
static inline void dp_queue_invalidate_shared(struct dp_queue *dp_queue, | ||
void __sparse_cache *ptr, uint32_t size) | ||
{ | ||
/* wrap-around? */ | ||
if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) { | ||
/* writeback till the end of circular buffer */ | ||
dcache_invalidate_region | ||
(ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr); | ||
size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr; | ||
ptr = (__sparse_force void __sparse_cache *)dp_queue->data_buffer; | ||
} | ||
/* invalidate rest of data */ | ||
dcache_invalidate_region(ptr, size); | ||
} | ||
|
||
static inline union lock_key dp_queue_lock(struct dp_queue *dp_queue) | ||
{ | ||
/* use cross-core spinlock in case of shared queue | ||
* When shared, dp_queue structure is located in not cached memory | ||
* as required by spinlock | ||
*/ | ||
union lock_key key; | ||
|
||
if (dp_queue_is_shared(dp_queue)) | ||
key.spinlock = k_spin_lock(&dp_queue->lock); | ||
else | ||
/* use faster irq_lock in case of not shared queue (located in cached mem) */ | ||
key.irq_lock = irq_lock(); | ||
|
||
return key; | ||
} | ||
|
||
static inline void dp_queue_unlock(struct dp_queue *dp_queue, union lock_key key) | ||
{ | ||
if (dp_queue_is_shared(dp_queue)) | ||
k_spin_unlock(&dp_queue->lock, key.spinlock); | ||
else | ||
irq_unlock(key.irq_lock); | ||
} | ||
|
||
struct sof_sink *dp_queue_get_sink(struct dp_queue *dp_queue) | ||
{ | ||
return &dp_queue->sink_api; | ||
} | ||
|
||
struct sof_source *dp_queue_get_source(struct dp_queue *dp_queue) | ||
{ | ||
return &dp_queue->source_api; | ||
} | ||
|
||
static size_t dp_queue_get_free_size(struct sof_sink *sink) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); | ||
|
||
return dp_queue->free_space; | ||
} | ||
|
||
static int dp_queue_get_buffer(struct sof_sink *sink, size_t req_size, | ||
void **data_ptr, void **buffer_start, size_t *buffer_size) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); | ||
|
||
/* dp_queue_get_free_size will return free size with adjustment for cacheline if needed */ | ||
if (req_size > dp_queue_get_free_size(sink)) | ||
return -ENODATA; | ||
|
||
/* no need to lock, just reading data that may be modified by commit_buffer only | ||
* | ||
* note! a sparse warning will be generated here till | ||
* https://github.com/thesofproject/sof/issues/8006 is implemented | ||
*/ | ||
*data_ptr = dp_queue->data_buffer + dp_queue->write_offset; | ||
*buffer_start = dp_queue->data_buffer; | ||
*buffer_size = dp_queue->data_buffer_size; | ||
|
||
/* provided buffer is an empty space, the requester will perform write operations only | ||
* no need to invalidate cache - will be overwritten anyway | ||
*/ | ||
return 0; | ||
} | ||
|
||
static int dp_queue_commit_buffer(struct sof_sink *sink, size_t commit_size) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); | ||
|
||
if (commit_size) { | ||
union lock_key key = dp_queue_lock(dp_queue); | ||
|
||
if (dp_queue_is_shared(dp_queue)) { | ||
/* a shared queue. We need to go through committed cachelines one-by-one | ||
* and if the whole cacheline is committed - writeback cache | ||
* and mark data as available for reading | ||
* | ||
* first, calculate the current and last committed cacheline | ||
* as offsets from buffer start | ||
*/ | ||
uint32_t current_cacheline = dp_queue->write_offset / | ||
PLATFORM_DCACHE_ALIGN; | ||
|
||
/* Last used cacheline may not be filled completely, calculate cacheline | ||
* containing 1st free byte | ||
*/ | ||
uint32_t last_cacheline = (dp_queue->write_offset + commit_size + 1) / | ||
PLATFORM_DCACHE_ALIGN; | ||
uint32_t total_num_of_cachelines = dp_queue->data_buffer_size / | ||
PLATFORM_DCACHE_ALIGN; | ||
/* wrap-around? */ | ||
last_cacheline %= total_num_of_cachelines; | ||
|
||
/* now go one by one. | ||
* if current_cacheline == last_full_cacheline - nothing to do | ||
*/ | ||
while (current_cacheline != last_cacheline) { | ||
/* writeback / invalidate */ | ||
uint8_t __sparse_cache *ptr = dp_queue->data_buffer + | ||
(current_cacheline * PLATFORM_DCACHE_ALIGN); | ||
dcache_writeback_region(ptr, PLATFORM_DCACHE_ALIGN); | ||
|
||
/* mark data as available to read */ | ||
dp_queue->available_data += PLATFORM_DCACHE_ALIGN; | ||
/* get next cacheline */ | ||
current_cacheline = (current_cacheline + 1) % | ||
total_num_of_cachelines; | ||
} | ||
} else { | ||
/* not shared */ | ||
dp_queue->available_data += commit_size; | ||
} | ||
|
||
/* move write pointer */ | ||
dp_queue->free_space -= commit_size; | ||
dp_queue->write_offset = (dp_queue->write_offset + commit_size) % | ||
dp_queue->data_buffer_size; | ||
|
||
dp_queue_unlock(dp_queue, key); | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
static size_t dp_queue_get_data_available(struct sof_source *source) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_source(source); | ||
|
||
/* access is read only, using uncached alias, no need to lock */ | ||
return dp_queue->available_data; | ||
} | ||
|
||
static int dp_queue_get_data(struct sof_source *source, size_t req_size, | ||
void **data_ptr, void **buffer_start, size_t *buffer_size) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_source(source); | ||
|
||
/* no need to lock, just reading data */ | ||
if (req_size > dp_queue_get_data_available(source)) | ||
return -ENODATA; | ||
|
||
/* | ||
* note! a sparse warning will be generated here till | ||
* https://github.com/thesofproject/sof/issues/8006 is implemented | ||
*/ | ||
*buffer_start = dp_queue->data_buffer; | ||
*buffer_size = dp_queue->data_buffer_size; | ||
*data_ptr = dp_queue->data_buffer + dp_queue->read_offset; | ||
|
||
/* clean cache in provided data range */ | ||
if (dp_queue_is_shared(dp_queue)) | ||
dp_queue_invalidate_shared(dp_queue, | ||
(__sparse_force void __sparse_cache *)*data_ptr, | ||
req_size); | ||
|
||
return 0; | ||
} | ||
|
||
static int dp_queue_release_data(struct sof_source *source, size_t free_size) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_source(source); | ||
|
||
if (free_size) { | ||
/* data consumed, free buffer space, no need for any special cache operations */ | ||
union lock_key key = dp_queue_lock(dp_queue); | ||
|
||
dp_queue->available_data -= free_size; | ||
dp_queue->free_space += free_size; | ||
dp_queue->read_offset = | ||
(dp_queue->read_offset + free_size) % dp_queue->data_buffer_size; | ||
|
||
dp_queue_unlock(dp_queue, key); | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
static int dp_queue_set_ipc_params(struct dp_queue *dp_queue, | ||
struct sof_ipc_stream_params *params, | ||
bool force_update) | ||
{ | ||
if (dp_queue->hw_params_configured && !force_update) | ||
return 0; | ||
|
||
dp_queue->audio_stream_params.frame_fmt = params->frame_fmt; | ||
dp_queue->audio_stream_params.rate = params->rate; | ||
dp_queue->audio_stream_params.channels = params->channels; | ||
dp_queue->audio_stream_params.buffer_fmt = params->buffer_fmt; | ||
|
||
dp_queue->hw_params_configured = true; | ||
|
||
return 0; | ||
} | ||
|
||
/* | ||
* note! a sparse warning will be generated here till | ||
* https://github.com/thesofproject/sof/issues/8006 is implemented | ||
*/ | ||
|
||
static int dp_queue_set_ipc_params_source(struct sof_source *source, | ||
struct sof_ipc_stream_params *params, | ||
bool force_update) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_source(source); | ||
|
||
return dp_queue_set_ipc_params(dp_queue, params, force_update); | ||
} | ||
|
||
static int dp_queue_set_ipc_params_sink(struct sof_sink *sink, | ||
struct sof_ipc_stream_params *params, | ||
bool force_update) | ||
{ | ||
struct dp_queue *dp_queue = dp_queue_get_from_sink(sink); | ||
|
||
return dp_queue_set_ipc_params(dp_queue, params, force_update); | ||
} | ||
|
||
/* | ||
* note! a sparse warning will be generated here till | ||
* https://github.com/thesofproject/sof/issues/8006 is implemented | ||
*/ | ||
static const struct source_ops dp_queue_source_ops = { | ||
.get_data_available = dp_queue_get_data_available, | ||
.get_data = dp_queue_get_data, | ||
.release_data = dp_queue_release_data, | ||
.audio_set_ipc_params = dp_queue_set_ipc_params_source, | ||
}; | ||
|
||
static const struct sink_ops dp_queue_sink_ops = { | ||
.get_free_size = dp_queue_get_free_size, | ||
.get_buffer = dp_queue_get_buffer, | ||
.commit_buffer = dp_queue_commit_buffer, | ||
.audio_set_ipc_params = dp_queue_set_ipc_params_sink, | ||
}; | ||
|
||
struct dp_queue *dp_queue_create(uint32_t ibs, uint32_t obs, uint32_t flags) | ||
{ | ||
uint32_t align; | ||
struct dp_queue *dp_queue; | ||
|
||
/* allocate DP structure */ | ||
if (flags & DP_QUEUE_MODE_SHARED) | ||
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME_SHARED, 0, SOF_MEM_CAPS_RAM, | ||
sizeof(*dp_queue)); | ||
else | ||
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*dp_queue)); | ||
if (!dp_queue) | ||
return NULL; | ||
|
||
dp_queue->flags = flags; | ||
|
||
/* initiate sink/source provide uncached pointers to audio_stream_params | ||
* | ||
* note! a sparse warning will be generated here till | ||
* https://github.com/thesofproject/sof/issues/8006 is implemented | ||
*/ | ||
source_init(dp_queue_get_source(dp_queue), &dp_queue_source_ops, | ||
&dp_queue->audio_stream_params); | ||
sink_init(dp_queue_get_sink(dp_queue), &dp_queue_sink_ops, | ||
&dp_queue->audio_stream_params); | ||
|
||
/* set original ibs/obs as required by the module */ | ||
sink_set_min_free(&dp_queue->sink_api, ibs); | ||
source_set_min_available(&dp_queue->source_api, obs); | ||
|
||
/* calculate required buffer size */ | ||
if (dp_queue_is_shared(dp_queue)) { | ||
ibs = MAX(ibs, PLATFORM_DCACHE_ALIGN); | ||
obs = MAX(obs, PLATFORM_DCACHE_ALIGN); | ||
align = PLATFORM_DCACHE_ALIGN; | ||
dp_queue->data_buffer_size = 3 * MAX(ibs, obs) + PLATFORM_DCACHE_ALIGN; | ||
/* reserve 1 cacheline to ensure exclusive use */ | ||
dp_queue->free_space = dp_queue->data_buffer_size - PLATFORM_DCACHE_ALIGN; | ||
} else { | ||
if ((ibs % obs == 0) && (obs % ibs == 0)) | ||
dp_queue->data_buffer_size = 2 * MAX(ibs, obs); | ||
else | ||
dp_queue->data_buffer_size = 3 * MAX(ibs, obs); | ||
align = 0; | ||
dp_queue->free_space = dp_queue->data_buffer_size; | ||
} | ||
dp_queue->available_data = 0; | ||
|
||
/* allocate data buffer - always in cached memory */ | ||
dp_queue->data_buffer = (__sparse_force __sparse_cache void *) | ||
rballoc_align(0, 0, dp_queue->data_buffer_size, align); | ||
if (!dp_queue->data_buffer) | ||
goto err; | ||
|
||
/* return allocated structure */ | ||
return dp_queue; | ||
err: | ||
rfree(dp_queue); | ||
return NULL; | ||
} | ||
|
||
void dp_queue_free(struct dp_queue *dp_queue) | ||
{ | ||
dp_queue_invalidate_shared(dp_queue, | ||
(__sparse_force void __sparse_cache *)dp_queue->data_buffer, | ||
dp_queue->data_buffer_size); | ||
|
||
rfree((__sparse_force void *)dp_queue->data_buffer); | ||
rfree(dp_queue); | ||
} | ||
|
||
struct sof_audio_stream_params *dp_queue_get_audio_params(struct dp_queue *dp_queue) | ||
{ | ||
return &dp_queue->audio_stream_params; | ||
} |
Oops, something went wrong.