Skip to content

Commit

Permalink
Merge pull request #614 from ess-dmsc/meta_data
Browse files Browse the repository at this point in the history
Meta data
  • Loading branch information
SkyToGround authored Nov 24, 2021
2 parents 44e0883 + 743fcb6 commit 7f3d18c
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 8 deletions.
1 change: 1 addition & 0 deletions changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
([#607](https://github.com/ess-dmsc/kafka-to-nexus/pull/607))
- Fixed a bug where the file-writer will not abandon an alternative command topic if it fails to start a file-writing job.
- The f142 value statistics written to file is now done so according to the NeXus format.
- Added the HDF5/NeXus file structure to the "writing finished"-message and a system test to check this.


## Version 4.0.0: Many, many changes
Expand Down
6 changes: 4 additions & 2 deletions src/CommandSystem/Handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ void Handler::revertCommandTopic() {
}

void Handler::sendHasStoppedMessage(std::string FileName,
std::string Metadata) {
nlohmann::json Metadata) {
Metadata["hdf_structure"] = NexusStructure;
CommandResponse->publishStoppedMsg(ActionResult::Success, JobId, "", FileName,
Metadata);
Metadata.dump());
PollForJob = true;
revertCommandTopic();
}
Expand Down Expand Up @@ -229,6 +230,7 @@ void Handler::handleStartCommand(FileWriter::Msg CommandMsg,
JobId = StartJob.JobID;
PollForJob = false;
JobPool->disconnectFromPool();
NexusStructure = StartJob.NexusStructure;
} catch (std::exception const &E) {
PollForJob = true;
JobId = "not_currently_writing";
Expand Down
5 changes: 3 additions & 2 deletions src/CommandSystem/Handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HandlerBase {
registerIsWritingFunction(IsWritingFuncType IsWritingFunction) = 0;

virtual void sendHasStoppedMessage(std::string FileName,
std::string Metadata) = 0;
nlohmann::json Metadata) = 0;
virtual void sendErrorEncounteredMessage(std::string FileName,
std::string Metadata,
std::string ErrorMessage) = 0;
Expand All @@ -61,7 +61,7 @@ class Handler : public HandlerBase {
void registerIsWritingFunction(IsWritingFuncType IsWritingFunction) override;

void sendHasStoppedMessage(std::string FileName,
std::string Metadata) override;
nlohmann::json Metadata) override;
void sendErrorEncounteredMessage(std::string FileName, std::string Metadata,
std::string ErrorMessage) override;

Expand All @@ -73,6 +73,7 @@ class Handler : public HandlerBase {
void handleStopCommand(FileWriter::Msg CommandMsg);
std::string const ServiceId;
std::string JobId;
std::string NexusStructure;
StartFuncType DoStart{
[](auto) { throw std::runtime_error("Not implemented."); }};
StopTimeFuncType DoSetStopTime{
Expand Down
2 changes: 1 addition & 1 deletion src/Master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void Master::setToIdle() {
}
CurrentJSONStatus.update(StaticMetaData);
CommandAndControl->sendHasStoppedMessage(CurrentFileName,
CurrentJSONStatus.dump());
CurrentJSONStatus);
}
CurrentStreamController.reset(nullptr);
CurrentState = WriterState::Idle;
Expand Down
3 changes: 2 additions & 1 deletion src/tests/MasterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class CommandHandlerStandIn : public Command::HandlerBase {
MAKE_MOCK1(registerIsWritingFunction, void(Command::IsWritingFuncType),
override);

MAKE_MOCK2(sendHasStoppedMessage, void(std::string, std::string), override);
MAKE_MOCK2(sendHasStoppedMessage, void(std::string, nlohmann::json),
override);
MAKE_MOCK3(sendErrorEncounteredMessage,
void(std::string, std::string, std::string), override);

Expand Down
2 changes: 1 addition & 1 deletion system-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def start_file_writer(request):


@pytest.fixture(scope="function", autouse=True)
def writer_channel(request):
def writer_channel(request) -> WorkerCommandChannel:
"""
:type request: _pytest.python.FixtureRequest
"""
Expand Down
2 changes: 1 addition & 1 deletion system-tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ h5py>=3.1.0
flatbuffers>=1.12
black==19.3b0
ess-streaming_data_types>=0.13.1
file-writer-control>=1.1.7
file-writer-control>=1.2.0
41 changes: 41 additions & 0 deletions system-tests/test_filewriter_kafka_meta_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from helpers.kafkahelpers import (
create_producer,
)
from helpers.nexushelpers import OpenNexusFile
from datetime import datetime, timedelta
import numpy as np
from file_writer_control.WriteJob import WriteJob
from helpers.writer import (
wait_start_job,
wait_writers_available,
wait_no_working_writers,
)


def test_static_data_reaches_file(writer_channel, worker_pool, kafka_address):
wait_writers_available(writer_channel, nr_of=1, timeout=10)
now = datetime.now()
start_time = now - timedelta(seconds=10)
stop_time = now
file_name = "output_file_kafka_meta_data.nxs"
with open("commands/nexus_structure_static.json", "r") as f:
structure = f.read()
write_job = WriteJob(
nexus_structure=structure,
file_name=file_name,
broker=kafka_address,
start_time=start_time,
stop_time=stop_time,
)
wait_start_job(worker_pool, write_job, timeout=20)

wait_no_working_writers(writer_channel, timeout=30)
current_jobs = writer_channel.list_known_jobs()
for c_job in current_jobs:
if c_job.job_id == write_job.job_id:
assert "extra" in c_job.metadata
assert "hdf_structure" in c_job.metadata
assert c_job.metadata["hdf_structure"] == structure
return

assert False, "Unable to find current job."

0 comments on commit 7f3d18c

Please sign in to comment.