Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline 2.0: Introducing Data Processing Queue #8020

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/audio/audio_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ static size_t audio_stream_get_data_available(struct sof_source *source)
}
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved

static int audio_stream_get_data(struct sof_source *source, size_t req_size,
void **data_ptr, void **buffer_start, size_t *buffer_size)
void const **data_ptr, void const **buffer_start,
size_t *buffer_size)
{
struct audio_stream *audio_stream = container_of(source, struct audio_stream, source_api);
struct comp_buffer *buffer = container_of(audio_stream, struct comp_buffer, stream);
Expand Down
300 changes: 300 additions & 0 deletions src/audio/dp_queue.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// SPDX-License-Identifier: BSD-3-Clause
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved
//
// Copyright(c) 2023 Intel Corporation. All rights reserved.
//

#include <sof/common.h>
#include <sof/trace/trace.h>
#include <sof/lib/uuid.h>

#include <sof/audio/dp_queue.h>

#include <rtos/alloc.h>
#include <ipc/topology.h>

LOG_MODULE_REGISTER(dp_queue, CONFIG_SOF_LOG_LEVEL);

/* 393608d8-4188-11ee-be56-0242ac120002 */
DECLARE_SOF_RT_UUID("dp_queue", dp_queue_uuid, 0x393608d8, 0x4188, 0x11ee,
0xbe, 0x56, 0x02, 0x42, 0xac, 0x12, 0x20, 0x02);
DECLARE_TR_CTX(dp_queue_tr, SOF_UUID(dp_queue_uuid), LOG_LEVEL_INFO);

static inline uint8_t __sparse_cache *dp_queue_buffer_end(struct dp_queue *dp_queue)
{
return dp_queue->_data_buffer + dp_queue->data_buffer_size;
}

static inline struct dp_queue *dp_queue_from_sink(struct sof_sink *sink)
{
return container_of(sink, struct dp_queue, _sink_api);
}

static inline struct dp_queue *dp_queue_from_source(struct sof_source *source)
{
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved
return container_of(source, struct dp_queue, _source_api);
}

static inline void dp_queue_invalidate_shared(struct dp_queue *dp_queue,
void __sparse_cache *ptr, size_t size)
{
/* no cache required in case of not shared queue */
if (!dp_queue_is_shared(dp_queue))
return;

/* wrap-around? */
if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) {
/* writeback till the end of circular buffer */
dcache_invalidate_region
(ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr);
size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr;
lyakh marked this conversation as resolved.
Show resolved Hide resolved
ptr = dp_queue->_data_buffer;
}
/* invalidate rest of data */
dcache_invalidate_region(ptr, size);
}

static inline void dp_queue_writeback_shared(struct dp_queue *dp_queue,
void __sparse_cache *ptr, size_t size)
{
/* no cache required in case of not shared queue */
if (!dp_queue_is_shared(dp_queue))
return;

/* wrap-around? */
if ((uintptr_t)ptr + size > (uintptr_t)dp_queue_buffer_end(dp_queue)) {
/* writeback till the end of circular buffer */
dcache_writeback_region
(ptr, (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr);
size -= (uintptr_t)dp_queue_buffer_end(dp_queue) - (uintptr_t)ptr;
ptr = dp_queue->_data_buffer;
}
/* writeback rest of data */
dcache_writeback_region(ptr, size);
}

static inline
uint8_t __sparse_cache *dp_queue_get_pointer(struct dp_queue *dp_queue, uint32_t offset)
{
/* check if offset is not in "double area"
* lines below do a quicker version of offset %= dp_queue->data_buffer_size;
*/
if (offset >= dp_queue->data_buffer_size)
offset -= dp_queue->data_buffer_size;
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved
return dp_queue->_data_buffer + offset;
}

static inline
uint32_t dp_queue_inc_offset(struct dp_queue *dp_queue, uint32_t offset, uint32_t inc)
{
assert(inc <= dp_queue->data_buffer_size);
offset += inc;
/* wrap around ? 2*size because of "double area" */
if (offset >= 2 * dp_queue->data_buffer_size)
offset -= 2 * dp_queue->data_buffer_size;
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved
return offset;
}

static inline
size_t _dp_queue_get_data_available(struct dp_queue *dp_queue)
{
int32_t avail_data = dp_queue->_write_offset - dp_queue->_read_offset;
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved
/* wrap around ? 2*size because of "double area" */
if (avail_data < 0)
avail_data = 2 * dp_queue->data_buffer_size + avail_data;

return avail_data;
}

static size_t dp_queue_get_data_available(struct sof_source *source)
{
struct dp_queue *dp_queue = dp_queue_from_source(source);

CORE_CHECK_STRUCT(dp_queue);
return _dp_queue_get_data_available(dp_queue);
}

static size_t dp_queue_get_free_size(struct sof_sink *sink)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

CORE_CHECK_STRUCT(dp_queue);
return dp_queue->data_buffer_size - _dp_queue_get_data_available(dp_queue);
}

static int dp_queue_get_buffer(struct sof_sink *sink, size_t req_size,
void **data_ptr, void **buffer_start, size_t *buffer_size)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

CORE_CHECK_STRUCT(dp_queue);
if (req_size > dp_queue_get_free_size(sink))
return -ENODATA;

/* note, __sparse_force is to be removed once sink/src use __sparse_cache for data ptrs */
*data_ptr = (__sparse_force void *)dp_queue_get_pointer(dp_queue, dp_queue->_write_offset);
*buffer_start = (__sparse_force void *)dp_queue->_data_buffer;
*buffer_size = dp_queue->data_buffer_size;

/* no need to invalidate cache - buffer is to be written only */
return 0;
}

static int dp_queue_commit_buffer(struct sof_sink *sink, size_t commit_size)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

CORE_CHECK_STRUCT(dp_queue);
if (commit_size) {
dp_queue_writeback_shared(dp_queue,
dp_queue_get_pointer(dp_queue, dp_queue->_write_offset),
commit_size);

/* move write pointer */
dp_queue->_write_offset =
dp_queue_inc_offset(dp_queue, dp_queue->_write_offset, commit_size);
}

return 0;
}

static int dp_queue_get_data(struct sof_source *source, size_t req_size,
void const **data_ptr, void const **buffer_start, size_t *buffer_size)
{
struct dp_queue *dp_queue = dp_queue_from_source(source);
__sparse_cache void *_data_ptr;
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved

CORE_CHECK_STRUCT(dp_queue);
if (req_size > dp_queue_get_data_available(source))
return -ENODATA;

_data_ptr = dp_queue_get_pointer(dp_queue, dp_queue->_read_offset);

/* clean cache in provided data range */
dp_queue_invalidate_shared(dp_queue, _data_ptr, req_size);

*buffer_start = (__sparse_force void *)dp_queue->_data_buffer;
*buffer_size = dp_queue->data_buffer_size;
*data_ptr = (__sparse_force void *)_data_ptr;

return 0;
}

static int dp_queue_release_data(struct sof_source *source, size_t free_size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a static function, so not too important, but maybe a name like "update_read" would be clearer - we don't really release data, we release buffer space

{
struct dp_queue *dp_queue = dp_queue_from_source(source);

CORE_CHECK_STRUCT(dp_queue);
if (free_size) {
/* data consumed, free buffer space, no need for any special cache operations */
dp_queue->_read_offset =
dp_queue_inc_offset(dp_queue, dp_queue->_read_offset, free_size);
}

return 0;
}

static int dp_queue_set_ipc_params(struct dp_queue *dp_queue,
struct sof_ipc_stream_params *params,
bool force_update)
{
CORE_CHECK_STRUCT(dp_queue);
if (dp_queue->_hw_params_configured && !force_update)
return 0;

dp_queue->audio_stream_params.frame_fmt = params->frame_fmt;
dp_queue->audio_stream_params.rate = params->rate;
dp_queue->audio_stream_params.channels = params->channels;
dp_queue->audio_stream_params.buffer_fmt = params->buffer_fmt;

dp_queue->_hw_params_configured = true;

return 0;
}

static int dp_queue_set_ipc_params_source(struct sof_source *source,
struct sof_ipc_stream_params *params,
bool force_update)
{
struct dp_queue *dp_queue = dp_queue_from_source(source);

CORE_CHECK_STRUCT(dp_queue);
return dp_queue_set_ipc_params(dp_queue, params, force_update);
}

static int dp_queue_set_ipc_params_sink(struct sof_sink *sink,
struct sof_ipc_stream_params *params,
bool force_update)
{
struct dp_queue *dp_queue = dp_queue_from_sink(sink);

CORE_CHECK_STRUCT(dp_queue);
return dp_queue_set_ipc_params(dp_queue, params, force_update);
}

static const struct source_ops dp_queue_source_ops = {
.get_data_available = dp_queue_get_data_available,
.get_data = dp_queue_get_data,
.release_data = dp_queue_release_data,
.audio_set_ipc_params = dp_queue_set_ipc_params_source,
};

static const struct sink_ops dp_queue_sink_ops = {
.get_free_size = dp_queue_get_free_size,
.get_buffer = dp_queue_get_buffer,
.commit_buffer = dp_queue_commit_buffer,
.audio_set_ipc_params = dp_queue_set_ipc_params_sink,
};

struct dp_queue *dp_queue_create(size_t ibs, size_t obs, uint32_t flags)
{
struct dp_queue *dp_queue;

/* allocate DP structure */
if (flags & DP_QUEUE_MODE_SHARED)
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME_SHARED, 0, SOF_MEM_CAPS_RAM,
sizeof(*dp_queue));
else
dp_queue = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*dp_queue));
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved
if (!dp_queue)
return NULL;

dp_queue->_flags = flags;

CORE_CHECK_STRUCT_INIT(dp_queue, flags & DP_QUEUE_MODE_SHARED);

/* initiate sink/source */
source_init(dp_queue_get_source(dp_queue), &dp_queue_source_ops,
&dp_queue->audio_stream_params);
sink_init(dp_queue_get_sink(dp_queue), &dp_queue_sink_ops,
&dp_queue->audio_stream_params);

/* set obs/ibs in sink/source interfaces */
sink_set_obs(&dp_queue->_sink_api, obs);
source_set_ibs(&dp_queue->_source_api, ibs);

uint32_t max_ibs_obs = MAX(ibs, obs);
uint32_t min_ibs_obs = MIN(ibs, obs);
marcinszkudlinski marked this conversation as resolved.
Show resolved Hide resolved

/* calculate required buffer size */
if (max_ibs_obs % min_ibs_obs == 0)
dp_queue->data_buffer_size = 2 * max_ibs_obs;
else
dp_queue->data_buffer_size = 3 * max_ibs_obs;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand this calculation...

Copy link
Contributor Author

@marcinszkudlinski marcinszkudlinski Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact I should reconsider 3x as it was estimated in first implementation when data needed to be passed in chunks
of cacheline size.
Now I went trough the same example I was analyzing before and probably 3* is not needed anymore

Will double check it and compare with reference FW and probably remove in next commit


/* allocate data buffer - always in cached memory alias */
dp_queue->data_buffer_size = ALIGN_UP(dp_queue->data_buffer_size, PLATFORM_DCACHE_ALIGN);
dp_queue->_data_buffer = (__sparse_force __sparse_cache void *)
rballoc_align(0, 0, dp_queue->data_buffer_size, PLATFORM_DCACHE_ALIGN);
if (!dp_queue->_data_buffer)
goto err;

tr_info(&dp_queue_tr, "DpQueue created, shared: %u ibs: %u obs %u, size %u",
dp_queue_is_shared(dp_queue), ibs, obs, dp_queue->data_buffer_size);

/* return a pointer to allocated structure */
return dp_queue;
err:
tr_err(&dp_queue_tr, "DpQueue creation failure");
rfree(dp_queue);
return NULL;
}
10 changes: 5 additions & 5 deletions src/audio/sink_source_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
int source_to_sink_copy(struct sof_source *source,
struct sof_sink *sink, bool free, size_t size)
{
uint8_t *src_ptr;
uint8_t *src_begin;
uint8_t *src_end;
uint8_t const *src_ptr;
uint8_t const *src_begin;
uint8_t const *src_end;
uint8_t *dst_ptr;
uint8_t *dst_begin;
uint8_t *dst_end;
Expand All @@ -32,8 +32,8 @@ int source_to_sink_copy(struct sof_source *source,
return -ENOSPC;

ret = source_get_data(source, size,
(void **)&src_ptr,
(void **)&src_begin,
(void const **)&src_ptr,
(void const **)&src_begin,
&src_size);
if (ret)
return ret;
Expand Down
2 changes: 1 addition & 1 deletion src/audio/source_api_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ size_t source_get_data_available(struct sof_source *source)
}

int source_get_data(struct sof_source *source, size_t req_size,
void **data_ptr, void **buffer_start, size_t *buffer_size)
void const **data_ptr, void const **buffer_start, size_t *buffer_size)
{
int ret;

Expand Down
Loading
Loading