Skip to content

Commit

Permalink
DP: connect dp_queue to processing DP modules
Browse files Browse the repository at this point in the history
DP components need to work flawlessly when run on a different
core than that of its connected peer modules.

To achieve this, a cross core producer-consumer safe
dp_queue should be used

DP queue can only be connected to modules that use
sink/src interface, so DP modules need to use it
obligatory.

To connect dp_queue into modules chain, double buffering
method is used:
 - in LL task DP module is processed as an LL module, but the
   copy method is copying data from/to audio_streams
   to/from dp_queues
 - the main DP module processing takes place in DP task
   (in separate Zephyr thread). The tread may be bind to
   separate core

Signed-off-by: Marcin Szkudlinski <[email protected]>
  • Loading branch information
marcinszkudlinski authored and kv2019i committed Sep 26, 2023
1 parent e441f17 commit cbc04be
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 93 deletions.
22 changes: 19 additions & 3 deletions src/audio/component.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,25 @@ int comp_copy(struct comp_dev *dev)

assert(dev->drv->ops.copy);

/* copy only if we are the owner of the LL component */
if (dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL &&
cpu_is_me(dev->ipc_config.core)) {
/* copy only if we are the owner of component OR this is DP component
*
* DP components (modules) require two stage processing:
*
* LL_mod -> [comp_buffer -> dp_queue] -> dp_mod -> [dp_queue -> comp_buffer] -> LL_mod
*
* - in first step (it means - now) the pipeline must copy source data from comp_buffer
* to dp_queue and result data from dp_queue to comp_buffer
*
* - second step will be performed by a thread specific to the DP module - DP module
* will take data from input dpQueue (using source API) , process it
* and put in output DP queue (using sink API)
*
* this allows the current pipeline structure to see a DP module as a "normal" LL
*
* to be removed when pipeline 2.0 is ready
*/
if (cpu_is_me(dev->ipc_config.core) ||
dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP) {
#if CONFIG_PERFORMANCE_COUNTERS
perf_cnt_init(&dev->pcd);
#endif
Expand Down
265 changes: 257 additions & 8 deletions src/audio/module_adapter/module_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <sof/audio/module_adapter/module/generic.h>
#include <sof/audio/sink_api.h>
#include <sof/audio/source_api.h>
#include <sof/audio/sink_source_utils.h>
#include <sof/audio/dp_queue.h>
#include <sof/audio/pipeline.h>
#include <sof/common.h>
#include <sof/platform.h>
Expand Down Expand Up @@ -71,7 +73,16 @@ struct comp_dev *module_adapter_new(const struct comp_driver *drv,
dev->ipc_config = *config;
dev->drv = drv;

mod = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*mod));
/* allocate module information.
* for DP shared modules this struct must be accessible from all cores
* Unfortunately at this point there's no information of components the module
* will be bound to. So we need to allocate shared memory for each DP module
* To be removed when pipeline 2.0 is ready
*/
enum mem_zone zone = config->proc_domain == COMP_PROCESSING_DOMAIN_DP ?
SOF_MEM_ZONE_RUNTIME_SHARED : SOF_MEM_ZONE_RUNTIME;

mod = rzalloc(zone, 0, SOF_MEM_CAPS_RAM, sizeof(*mod));
if (!mod) {
comp_err(dev, "module_adapter_new(), failed to allocate memory for module");
rfree(dev);
Expand Down Expand Up @@ -161,6 +172,12 @@ struct comp_dev *module_adapter_new(const struct comp_driver *drv,
goto err;
}

#if CONFIG_ZEPHYR_DP_SCHEDULER
/* create a task for DP processing */
if (config->proc_domain == COMP_PROCESSING_DOMAIN_DP)
pipeline_comp_dp_task_init(dev);
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

#if CONFIG_IPC_MAJOR_4
dst->init_data = NULL;
#endif
Expand Down Expand Up @@ -208,6 +225,133 @@ static int module_adapter_sink_src_prepare(struct comp_dev *dev)
return ret;
}

#if CONFIG_ZEPHYR_DP_SCHEDULER
static int module_adapter_dp_queue_prepare(struct comp_dev *dev)
{
int dp_mode = dev->is_shared ? DP_QUEUE_MODE_SHARED : DP_QUEUE_MODE_LOCAL;
struct processing_module *mod = comp_get_drvdata(dev);
struct dp_queue *dp_queue;
struct list_item *blist;
int ret;
int i;

/* for DP processing we need to create a DP QUEUE for each module input/output
* till pipeline2.0 is ready, DP processing requires double buffering
*
* first, set all parameters by calling "module prepare" with pointers to
* "main" audio_stream buffers
*/
ret = module_adapter_sink_src_prepare(dev);
if (ret)
return ret;

/*
* second step - create a "shadow" cross-core DpQueue for existing buffers
* and copy stream parameters to shadow buffers
*/
i = 0;
list_init(&mod->dp_queue_ll_to_dp_list);
list_for_item(blist, &dev->bsource_list) {
struct comp_buffer *source_buffer =
container_of(blist, struct comp_buffer, sink_list);

/* copy IBS & OBS from buffer to be shadowed */
size_t min_available =
source_get_min_available(audio_stream_get_source(&source_buffer->stream));
size_t min_free_space =
sink_get_min_free_space(audio_stream_get_sink(&source_buffer->stream));

/* create a shadow dp queue */
dp_queue = dp_queue_create(min_available, min_free_space, dp_mode);

if (!dp_queue)
goto err;
dp_queue_append_to_list(dp_queue, &mod->dp_queue_ll_to_dp_list);

/* it will override source pointers set by module_adapter_sink_src_prepare
* module will use shadow dpQueue for processing
*/
mod->sources[i] = dp_queue_get_source(dp_queue);

/* copy parameters from buffer to be shadowed */
memcpy_s(&dp_queue->audio_stream_params,
sizeof(dp_queue->audio_stream_params),
&source_buffer->stream.runtime_stream_params,
sizeof(source_buffer->stream.runtime_stream_params));
i++;
}
mod->num_of_sources = i;

i = 0;
list_init(&mod->dp_queue_dp_to_ll_list);
list_for_item(blist, &dev->bsink_list) {
struct comp_buffer *sink_buffer =
container_of(blist, struct comp_buffer, source_list);

/* copy IBS & OBS from buffer to be shadowed */
size_t min_available =
source_get_min_available(audio_stream_get_source(&sink_buffer->stream));
size_t min_free_space =
sink_get_min_free_space(audio_stream_get_sink(&sink_buffer->stream));

/* create a shadow dp queue */
dp_queue = dp_queue_create(min_available, min_free_space, dp_mode);

if (!dp_queue)
goto err;

dp_queue_append_to_list(dp_queue, &mod->dp_queue_dp_to_ll_list);
/* it will override sink pointers set by module_adapter_sink_src_prepare
* module will use shadow dpQueue for processing
*/
mod->sinks[i] = dp_queue_get_sink(dp_queue);

/* copy parameters from buffer to be shadowed */
memcpy_s(&dp_queue->audio_stream_params,
sizeof(dp_queue->audio_stream_params),
&sink_buffer->stream.runtime_stream_params,
sizeof(sink_buffer->stream.runtime_stream_params));

i++;
}
mod->num_of_sinks = i;

return 0;

err:;
struct list_item *dp_queue_list_item;
struct list_item *tmp;

i = 0;
list_for_item_safe(dp_queue_list_item, tmp, &mod->dp_queue_dp_to_ll_list) {
struct dp_queue *dp_queue =
container_of(dp_queue_list_item, struct dp_queue, list);

/* dp free will also remove the queue from a list */
dp_queue_free(dp_queue);
mod->sources[i++] = NULL;
}
mod->num_of_sources = 0;

i = 0;
list_for_item_safe(dp_queue_list_item, tmp, &mod->dp_queue_ll_to_dp_list) {
struct dp_queue *dp_queue =
container_of(dp_queue_list_item, struct dp_queue, list);

dp_queue_free(dp_queue);
mod->sinks[i++] = NULL;
}
mod->num_of_sinks = 0;

return -ENOMEM;
}
#else
static inline int module_adapter_dp_queue_prepare(struct comp_dev *dev)
{
return -EINVAL;
}
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

/*
* \brief Prepare the module
* \param[in] dev - component device pointer.
Expand All @@ -230,11 +374,21 @@ int module_adapter_prepare(struct comp_dev *dev)
comp_dbg(dev, "module_adapter_prepare() start");

/* Prepare module */
if (IS_PROCESSING_MODE_SINK_SOURCE(mod))
if (IS_PROCESSING_MODE_SINK_SOURCE(mod) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP)
ret = module_adapter_dp_queue_prepare(dev);

else if (IS_PROCESSING_MODE_SINK_SOURCE(mod) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL)
ret = module_adapter_sink_src_prepare(dev);
else

else if ((IS_PROCESSING_MODE_RAW_DATA(mod) || IS_PROCESSING_MODE_AUDIO_STREAM(mod)) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_LL)
ret = module_prepare(mod, NULL, 0, NULL, 0);

else
ret = -EINVAL;

if (ret) {
if (ret != PPL_STATUS_PATH_STOP)
comp_err(dev, "module_adapter_prepare() error %x: module prepare failed",
Expand Down Expand Up @@ -273,13 +427,15 @@ int module_adapter_prepare(struct comp_dev *dev)
mod->period_bytes = audio_stream_period_bytes(&sink->stream, dev->frames);
comp_dbg(dev, "module_adapter_prepare(): got period_bytes = %u", mod->period_bytes);

mod->num_of_sources = 0;
mod->num_of_sinks = 0;
/* no more to do for sink/source mode */
if (IS_PROCESSING_MODE_SINK_SOURCE(mod))
return 0;

/*
* compute number of input buffers and make the source_info shared if the module is on a
* different core than any of it's sources
*/
mod->num_of_sources = 0;
list_for_item(blist, &dev->bsource_list) {
struct comp_buffer *buf;
struct comp_dev *source;
Expand All @@ -294,6 +450,7 @@ int module_adapter_prepare(struct comp_dev *dev)
}

/* compute number of output buffers */
mod->num_of_sinks = 0;
list_for_item(blist, &dev->bsink_list)
mod->num_of_sinks++;

Expand Down Expand Up @@ -964,6 +1121,67 @@ static int module_adapter_audio_stream_type_copy(struct comp_dev *dev)
return ret;
}

#if CONFIG_ZEPHYR_DP_SCHEDULER
static int module_adapter_copy_dp_queues(struct comp_dev *dev)
{
/*
* copy data from component audio streams to dp_queue
* DP module processing itself will take place in DP thread
* This is an adapter, to be removed when pipeline2.0 is ready
*/
struct processing_module *mod = comp_get_drvdata(dev);
struct dp_queue *dp_queue;
struct list_item *blist;
int err;

dp_queue = dp_queue_get_first_item(&mod->dp_queue_ll_to_dp_list);
list_for_item(blist, &dev->bsource_list) {
/* input - we need to copy data from audio_stream (as source)
* to dp_queue (as sink)
*/
assert(dp_queue);
struct comp_buffer *buffer =
container_of(blist, struct comp_buffer, sink_list);
struct sof_source *data_src = audio_stream_get_source(&buffer->stream);
struct sof_sink *data_sink = dp_queue_get_sink(dp_queue);
uint32_t to_copy = MIN(sink_get_free_size(data_sink),
source_get_data_available(data_src));

err = source_to_sink_copy(data_src, data_sink, true, to_copy);
if (err)
return err;

dp_queue = dp_queue_get_next_item(dp_queue);
}

dp_queue = dp_queue_get_first_item(&mod->dp_queue_dp_to_ll_list);
list_for_item(blist, &dev->bsink_list) {
/* output - we need to copy data from dp_queue (as source)
* to audio_stream (as sink)
*/
assert(dp_queue);
struct comp_buffer *buffer =
container_of(blist, struct comp_buffer, source_list);
struct sof_sink *data_sink = audio_stream_get_sink(&buffer->stream);
struct sof_source *data_src = dp_queue_get_source(dp_queue);
uint32_t to_copy = MIN(sink_get_free_size(data_sink),
source_get_data_available(data_src));

err = source_to_sink_copy(data_src, data_sink, true, to_copy);
if (err)
return err;

dp_queue = dp_queue_get_next_item(dp_queue);
}
return 0;
}
#else
static inline int module_adapter_copy_dp_queues(struct comp_dev *dev)
{
return -ENOTSUP;
}
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

static int module_adapter_sink_source_copy(struct comp_dev *dev)
{
struct processing_module *mod = comp_get_drvdata(dev);
Expand Down Expand Up @@ -1105,8 +1323,13 @@ int module_adapter_copy(struct comp_dev *dev)
if (IS_PROCESSING_MODE_RAW_DATA(mod))
return module_adapter_raw_data_type_copy(dev);

if (IS_PROCESSING_MODE_SINK_SOURCE(mod))
return module_adapter_sink_source_copy(dev);
if (IS_PROCESSING_MODE_SINK_SOURCE(mod)) {
if (mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP)
return module_adapter_copy_dp_queues(dev);
else
return module_adapter_sink_source_copy(dev);

}

comp_err(dev, "module_adapter_copy(): unknown processing_data_type");
return -EINVAL;
Expand Down Expand Up @@ -1335,11 +1558,37 @@ int module_adapter_reset(struct comp_dev *dev)

mod->num_of_sources = 0;
mod->num_of_sinks = 0;
} else if (IS_PROCESSING_MODE_SINK_SOURCE(mod)) {
}
#if CONFIG_ZEPHYR_DP_SCHEDULER
if (IS_PROCESSING_MODE_SINK_SOURCE(mod) &&
mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP) {
/* for DP processing - free DP Queues */
struct list_item *dp_queue_list_item;
struct list_item *tmp;

list_for_item_safe(dp_queue_list_item, tmp, &mod->dp_queue_dp_to_ll_list) {
struct dp_queue *dp_queue =
container_of(dp_queue_list_item, struct dp_queue, list);

/* dp free will also remove the queue from a list */
dp_queue_free(dp_queue);
}
list_for_item_safe(dp_queue_list_item, tmp, &mod->dp_queue_ll_to_dp_list) {
struct dp_queue *dp_queue =
container_of(dp_queue_list_item, struct dp_queue, list);

dp_queue_free(dp_queue);
}
}
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */
if (IS_PROCESSING_MODE_SINK_SOURCE(mod)) {
/* for both LL and DP processing */
for (int i = 0; i < mod->num_of_sources; i++)
mod->sources[i] = NULL;
for (int i = 0; i < mod->num_of_sinks; i++)
mod->sinks[i] = NULL;
mod->num_of_sinks = 0;
mod->num_of_sources = 0;
}

mod->total_data_consumed = 0;
Expand Down
Loading

0 comments on commit cbc04be

Please sign in to comment.