Skip to content

Commit

Permalink
Use streamKeys for synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
medengineer committed Oct 13, 2023
1 parent 1c088f8 commit c5823e9
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 126 deletions.
28 changes: 16 additions & 12 deletions Source/Processors/EventTranslator/EventTranslator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void EventTranslator::updateSettings()

const uint16 streamId = stream->getStreamId();

synchronizer.addDataStream(streamId, stream->getSampleRate());
synchronizer.addDataStream(stream->getKey(), stream->getSampleRate());

EventChannel::Settings s{
EventChannel::Type::TTL,
Expand Down Expand Up @@ -119,40 +119,42 @@ void EventTranslator::process (AudioBuffer<float>& buffer)
void EventTranslator::handleTTLEvent(TTLEventPtr event)
{
const uint16 eventStream = event->getStreamId();
const String eventStreamKey = getDataStream(eventStream)->getKey();
const int ttlLine = event->getLine();
const int64 sampleNumber = event->getSampleNumber();

if (synchronizer.getSyncLine(eventStream) == ttlLine)
if (synchronizer.getSyncLine(eventStreamKey) == ttlLine)
{
synchronizer.addEvent(eventStream, ttlLine, sampleNumber);
synchronizer.addEvent(eventStreamKey, ttlLine, sampleNumber);

return;
}

if (eventStream == synchronizer.mainStreamId && synchronizer.isStreamSynced(eventStream))
if (eventStreamKey == synchronizer.mainStreamKey && synchronizer.isStreamSynced(eventStreamKey))
{

//std::cout << "TRANSLATE!" << std::endl;

const bool state = event->getState();

double timestamp = synchronizer.convertSampleNumberToTimestamp(eventStream, sampleNumber);
double timestamp = synchronizer.convertSampleNumberToTimestamp(eventStreamKey, sampleNumber);

for (auto stream : getDataStreams())
{

const uint16 streamId = stream->getStreamId();
const String streamKey = stream->getKey();

if (streamId == eventStream)
if (streamKey == eventStreamKey)
continue; // don't translate events back to the main stream

if (synchronizer.isStreamSynced(streamId))
if (synchronizer.isStreamSynced(streamKey))
{

//std::cout << "original sample number: " <<sampleNumber << std::endl;
//std::cout << "original timestamp: " <<timestamp << std::endl;

int64 newSampleNumber = synchronizer.convertTimestampToSampleNumber(streamId, timestamp);
int64 newSampleNumber = synchronizer.convertTimestampToSampleNumber(streamKey, timestamp);

//std::cout << "new sample number (" << streamId << "): " << newSampleNumber << std::endl;
//std::cout << std::endl;
Expand All @@ -179,11 +181,12 @@ void EventTranslator::saveCustomParametersToXml(XmlElement* xml)
{

const uint16 streamId = stream->getStreamId();
const String streamKey = stream->getKey();

XmlElement* streamXml = xml->createNewChildElement("STREAM");

streamXml->setAttribute("isMainStream", synchronizer.mainStreamId == streamId);
streamXml->setAttribute("sync_line", getSyncLine(streamId));
streamXml->setAttribute("isMainStream", synchronizer.mainStreamKey == streamKey);
streamXml->setAttribute("sync_line", getSyncLine(streamKey));
streamXml->setAttribute("name", stream->getName());
streamXml->setAttribute("source_node_id", stream->getSourceNodeId());
streamXml->setAttribute("sample_rate", stream->getSampleRate());
Expand Down Expand Up @@ -216,6 +219,7 @@ void EventTranslator::loadCustomParametersFromXml(XmlElement* xml)
{
int matchingIndex = findMatchingStreamParameters(stream);
const uint16 streamId = stream->getStreamId();
String streamKey = stream->getKey();

if (matchingIndex > -1)
{
Expand All @@ -231,10 +235,10 @@ void EventTranslator::loadCustomParametersFromXml(XmlElement* xml)
{
if (subNode->getBoolAttribute("isMainStream", false))
{
setMainDataStream(streamId);
setMainDataStream(streamKey);
}

setSyncLine(streamId, subNode->getIntAttribute("sync_line", 0));
setSyncLine(streamKey, subNode->getIntAttribute("sync_line", 0));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void EventTranslatorEditor::updateSettings()

buttons.add(new SyncControlButton(proc,
name,
streamId,
stream->getKey(),
numLines));

buttons.getLast()->setBounds(18 + column * 25, 30 + row * 25, 18, 18);
Expand Down
10 changes: 10 additions & 0 deletions Source/Processors/GenericProcessor/GenericProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,16 @@ DataStream* GenericProcessor::getDataStream(uint16 streamId) const
return dataStreamMap.at(streamId);
}

DataStream* GenericProcessor::getDataStream(String streamKey) const
{
for (auto stream : dataStreams)
{
if (stream->getKey() == streamKey)
return stream;
}
return nullptr;
}

uint16 GenericProcessor::findSimilarStream(int sourceNodeId, String name, float sample_rate, bool sourceNodeIdMustMatch)
{

Expand Down
2 changes: 2 additions & 0 deletions Source/Processors/GenericProcessor/GenericProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ class PLUGIN_API GenericProcessor : public GenericProcessorBase

DataStream* getDataStream(uint16 streamId) const;

DataStream* getDataStream(String streamKey) const;

uint16 findSimilarStream(int sourceNodeId, String name, float sample_rate, bool sourceNodeIdMustMatch = false);

virtual Array<const DataStream*> getStreamsForDestNode(GenericProcessor* processor);
Expand Down
64 changes: 40 additions & 24 deletions Source/Processors/RecordNode/RecordNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void RecordNode::parameterValueChanged(Parameter* p)
String key = stream->getKey();
if (key == streamNames[((SelectedStreamParameter*)p)->getSelectedIndex()])
{
synchronizer.setMainDataStream(stream->getStreamId());
synchronizer.setMainDataStream(stream->getKey());
break;
}
}
Expand Down Expand Up @@ -194,11 +194,15 @@ void RecordNode::handleBroadcastMessage(String msg)
if (recordEvents && isRecording)
{

int64 messageSampleNumber = getFirstSampleNumberForBlock(synchronizer.mainStreamId);
String streamKey = synchronizer.mainStreamKey;

DataStream* mainStream = getDataStream(streamKey);

int64 messageSampleNumber = getFirstSampleNumberForBlock(mainStream->getStreamId());

TextEventPtr event = TextEvent::createTextEvent(getMessageChannel(), messageSampleNumber, msg);

double ts = synchronizer.convertSampleNumberToTimestamp(synchronizer.mainStreamId, messageSampleNumber);
double ts = synchronizer.convertSampleNumberToTimestamp(synchronizer.mainStreamKey, messageSampleNumber);

event->setTimestampInSeconds(ts);

Expand Down Expand Up @@ -437,8 +441,7 @@ void RecordNode::updateSettings()

LOGD("Record Node found stream: (", streamId, ") ", stream->getName());
//activeStreamIds.add(stream->getStreamId());
synchronizer.addDataStream(streamId,
stream->getSampleRate());
synchronizer.addDataStream(stream->getKey(), stream->getSampleRate());

fifoUsage[streamId] = 0.0f;

Expand Down Expand Up @@ -534,7 +537,7 @@ bool RecordNode::isSynchronized()
for (auto stream : dataStreams)
{

SyncStatus status = synchronizer.getStatus(stream->getStreamId());
SyncStatus status = synchronizer.getStatus(stream->getKey());

if (status != SYNCED)
return false;
Expand All @@ -553,7 +556,7 @@ bool RecordNode::startAcquisition()
{
eventChannels.add(new EventChannel(*messageChannel));
eventChannels.getLast()->addProcessor(this);
eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainStreamId), false);
eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainStreamKey), false);
}

return true;
Expand Down Expand Up @@ -611,7 +614,7 @@ void RecordNode::startRecording()
{
eventChannels.add(new EventChannel(*messageChannel));
eventChannels.getLast()->addProcessor(this);
eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainStreamId), false);
eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainStreamKey), false);
}

int lastSourceNodeId = -1;
Expand Down Expand Up @@ -734,15 +737,17 @@ void RecordNode::handleTTLEvent(TTLEventPtr event)

int64 sampleNumber = event->getSampleNumber();

synchronizer.addEvent(event->getStreamId(), event->getLine(), sampleNumber);
String streamKey = getDataStream(event->getStreamId())->getKey();

synchronizer.addEvent(streamKey, event->getLine(), sampleNumber);

if (recordEvents && isRecording)
{

size_t size = event->getChannelInfo()->getDataSize() + event->getChannelInfo()->getTotalEventMetadataSize() + EVENT_BASE_SIZE;

HeapBlock<char> buffer(size);
event->setTimestampInSeconds(synchronizer.convertSampleNumberToTimestamp(event->getStreamId(), sampleNumber));
event->setTimestampInSeconds(synchronizer.convertSampleNumberToTimestamp(streamKey, sampleNumber));
event->serialize(buffer, size);

eventQueue->addEvent(EventPacket(buffer, size), sampleNumber);
Expand All @@ -763,7 +768,9 @@ void RecordNode::handleEvent(const EventChannel* eventInfo, const EventPacket& p

int eventIndex = getIndexOfMatchingChannel(eventInfo);

Event::setTimestampInSeconds(packet, synchronizer.convertSampleNumberToTimestamp(eventInfo->getStreamId(), sampleNumber));
String streamKey = getDataStream(eventInfo->getStreamId())->getKey();

Event::setTimestampInSeconds(packet, synchronizer.convertSampleNumberToTimestamp(streamKey, sampleNumber));

eventQueue->addEvent(packet, sampleNumber, eventIndex);

Expand All @@ -779,7 +786,8 @@ void RecordNode::handleSpike(SpikePtr spike)

if (recordSpikes)
{
spike->setTimestampInSeconds(synchronizer.convertSampleNumberToTimestamp(spike->getStreamId(),
String streamKey = getDataStream(spike->getStreamId())->getKey();
spike->setTimestampInSeconds(synchronizer.convertSampleNumberToTimestamp(streamKey,
spike->getSampleNumber()));
writeSpike(spike, spike->getChannelInfo());
eventMonitor->bufferedSpikes++;
Expand Down Expand Up @@ -852,15 +860,16 @@ void RecordNode::process(AudioBuffer<float>& buffer)
streamIndex++;

const uint16 streamId = stream->getStreamId();
const String streamKey = stream->getKey();

uint32 numSamples = getNumSamplesInBlock(streamId);

int64 sampleNumber = getFirstSampleNumberForBlock(streamId);

if (numSamples > 0)
{
double first = synchronizer.convertSampleNumberToTimestamp(streamId, sampleNumber);
double second = synchronizer.convertSampleNumberToTimestamp(streamId, sampleNumber + 1);
double first = synchronizer.convertSampleNumberToTimestamp(streamKey, sampleNumber);
double second = synchronizer.convertSampleNumberToTimestamp(streamKey, sampleNumber + 1);

fifoUsage[streamId] = dataQueue->writeSynchronizedTimestamps(
first,
Expand Down Expand Up @@ -985,6 +994,7 @@ void RecordNode::clearRecordEngines()
void RecordNode::saveCustomParametersToXml(XmlElement* xml)
{

/*
xml->setAttribute ("path", dataDirectory.getFullPathName());
xml->setAttribute("engine", recordEngine->getEngineId());
xml->setAttribute ("recordEvents", recordEvents);
Expand All @@ -1000,14 +1010,17 @@ void RecordNode::saveCustomParametersToXml(XmlElement* xml)
for (auto stream : getDataStreams())
{
const uint16 streamId = stream->getStreamId();
//const uint16 streamId = stream->getStreamId();
const String streamKey = stream->getKey();
if (recordContinuousChannels[streamId].size() > 0)
auto selectedChannels = ((MaskChannelsParameter*)stream->getParameter("channels"))->getChannelStates();
if (selectedChannels.size() > 0)
{
XmlElement* streamXml = xml->createNewChildElement("STREAM");
streamXml->setAttribute("isMainStream", synchronizer.mainStreamId == streamId);
streamXml->setAttribute("sync_line", getSyncLine(streamId));
streamXml->setAttribute("isMainStream", synchronizer.mainStreamKey == streamKey);
streamXml->setAttribute("sync_line", getSyncLine(streamKey));
streamXml->setAttribute("name", stream->getName());
streamXml->setAttribute("source_node_id", stream->getSourceNodeId());
streamXml->setAttribute("sample_rate", stream->getSampleRate());
Expand All @@ -1020,9 +1033,9 @@ void RecordNode::saveCustomParametersToXml(XmlElement* xml)
bool allOn = true;
bool allOff = true;
for (int ch = 0; ch < recordContinuousChannels[streamId].size(); ch++)
for (int ch = 0; ch < selectedChannels.size(); ch++)
{
bool state = recordContinuousChannels[streamId][ch];
bool state = selectedChannels[ch];
if (!state)
allOn = false;
Expand All @@ -1043,13 +1056,14 @@ void RecordNode::saveCustomParametersToXml(XmlElement* xml)
}
}
*/
}


void RecordNode::loadCustomParametersFromXml(XmlElement* xml)
{

return;
/*
String savedPath = xml->getStringAttribute("path");
Expand Down Expand Up @@ -1092,7 +1106,8 @@ void RecordNode::loadCustomParametersFromXml(XmlElement* xml)
if (findMatchingStreamParameters(stream) > -1)
{
const uint16 streamId = stream->getStreamId();
//const uint16 streamId = stream->getStreamId();
const String streamKey = stream->getKey();
for (auto* subNode : xml->getChildIterator())
{
Expand All @@ -1103,10 +1118,10 @@ void RecordNode::loadCustomParametersFromXml(XmlElement* xml)
if (subNode->getBoolAttribute("isMainStream", false))
{
setMainDataStream(streamId);
setMainDataStream(stream->getKey());
}
setSyncLine(streamId, subNode->getIntAttribute("sync_line", 0));
setSyncLine(stream->getKey(), subNode->getIntAttribute("sync_line", 0));
String recordState = subNode->getStringAttribute("recording_state", "ALL");
Expand Down Expand Up @@ -1141,5 +1156,6 @@ void RecordNode::loadCustomParametersFromXml(XmlElement* xml)
}
}
*/

}
10 changes: 6 additions & 4 deletions Source/Processors/RecordNode/RecordNodeEditor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,17 @@ SyncChannelsParameterEditor::SyncChannelsParameterEditor(RecordNode* rn, Paramet

recordNode = rn;

uint64 streamId = ((DataStream*)param->getOwner())->getStreamId();
String streamName = ((DataStream*)param->getOwner())->getName();
String sourceNodeId = String(((DataStream*)param->getOwner())->getSourceNodeId());
DataStream* stream = ((DataStream*)param->getOwner());

uint64 streamId = stream->getStreamId();
String streamName = stream->getName();
int sourceNodeId = stream->getSourceNodeId();

int nEvents = 8; //TODO: Better way to determine how many TTL lines are available

syncControlButton = std::make_unique<SyncControlButton>(recordNode,
recordNode->getDataStream(streamId)->getName(),
streamId, 8);
stream->getKey(), 8);
syncControlButton->setTooltip("Configure synchronization settings for this stream");
syncControlButton->addListener(this);
syncControlButton->setBounds(0, 0, 15, 15);
Expand Down
Loading

0 comments on commit c5823e9

Please sign in to comment.