Skip to content

Commit

Permalink
Merge pull request #676 from ess-dmsc/ECDC-3237_add_support_to_proces…
Browse files Browse the repository at this point in the history
…s_commands_starting_at_timestamp

ECDC-3237 Add support to process commands starting at timestamp
  • Loading branch information
danesss authored Feb 20, 2023
2 parents 4a5ad18 + bcf0ebc commit 95fad77
Show file tree
Hide file tree
Showing 19 changed files with 426 additions and 158 deletions.
21 changes: 16 additions & 5 deletions changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,31 @@

## Next version

- Redact Kafka SASL password in logs.
- Ignore deprecated warnings on macOS (can be removed when https://github.com/chriskohlhoff/asio/issues/1183 is addressed.
-

## Version 6.0.0

- Breaking: Run start messages (pl72 schema) now require a `start_time` field.
- Breaking: Ignore Kafka IP addresses sent in `StartJob` messages, experiment
data is now fetched from the broker configured in `job-pool-uri`.
- The case when messages are not received from a specific Kafka topic does not make the file writer unsubscribe from the topic anymore. Instead a warning is provided in the file writer log.
- Updated Conan package dependencies:
- librdkakfa (1.9.2)
- Commands received without an explicit kafka-to-nexus `service_id` are no
longer skipped. The command is processed by all workers and accepted by the
worker with a matching `job_id`.
- Fix: The case when messages are not received from a specific Kafka topic does not
make the file writer unsubscribe from the topic anymore. Instead a warning is
provided in the file writer log.
- Fix: Stop commands sent immediately after a start command were not always processed.
- Fix: Redact Kafka SASL password in logs.
- Adding _f144_, _al00_ and _ep01_ writer modules. For more information on the schemas mentioned,
see ([schema definitions here](https://github.com/ess-dmsc/streaming-data-types)).
- Adding _se00_ writer module (see [schema definitions here](https://github.com/ess-dmsc/streaming-data-types)).
- Adding _ev44_ writer module (see [schema definitions here](https://github.com/ess-dmsc/streaming-data-types)).
- Ignore deprecated warnings on macOS (can be removed when https://github.com/chriskohlhoff/asio/issues/1183 is addressed.
- Enable idempotence setting in the Kafka producer.
- Updated librdkakfa Conan package version to 2.0.2



## Version 5.2.0: Kafka improvements and other fixes

- Increased kafka message buffer sizes and added integration tests for this.
Expand All @@ -29,6 +39,7 @@
- Fix to make all Kafka connections honour the provided librdkafka parameters.
- Silencing x5f2 schema message and file writer not currently writing status message.


## Version 5.1.0: Attributes and dependencies

- Renamed system tests to integration tests.
Expand Down
2 changes: 1 addition & 1 deletion docker_launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FOUND=false

for I in {1..10}
do
if $KAFKA_CMD | grep -q "TEST_writer_commands" && $KAFKA_CMD | grep -q "TEST_writer_jobs"; then
if $KAFKA_CMD | grep -q "TEST_writer_commands_alternative" && $KAFKA_CMD | grep -q "TEST_writer_commands" && $KAFKA_CMD | grep -q "TEST_writer_jobs"; then
FOUND=true
break
else
Expand Down
18 changes: 11 additions & 7 deletions integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ def delivery_callback(err, msg):
n_polls = 0
while n_polls < 10 and not topic_ready:
all_topics = client.list_topics().topics.keys()
if "TEST_writer_jobs" in all_topics and "TEST_writer_commands" in all_topics:
if (
"TEST_writer_jobs" in all_topics
and "TEST_writer_commands" in all_topics
and "TEST_writer_commands_alternative" in all_topics
):
topic_ready = True
print("Topic is ready!", flush=True)
break
Expand Down Expand Up @@ -172,18 +176,18 @@ def build_and_run(
time.sleep(10)

def fin():
# Stop the containers then remove them and their volumes (--volumes option)
if not custom_kafka_broker:
print("Stopping docker containers", flush=True)
options["--timeout"] = 30
cmd.down(options)
print("Containers stopped", flush=True)
print("Stopping file-writers")
for fw in list_of_writers:
fw.terminate()
for fw in list_of_writers:
fw.wait()
print("File-writers stopped")
# Stop the containers then remove them and their volumes (--volumes option)
if not custom_kafka_broker:
print("Stopping docker containers", flush=True)
options["--timeout"] = 10
cmd.down(options)
print("Containers stopped", flush=True)

# Using a finalizer rather than yield in the fixture means
# that the containers will be brought down even if tests fail
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
KAFKA_REPLICA_FETCH_MAX_BYTES: 300000000
KAFKA_BROKER_ID: 0
KAFKA_LOG_RETENTION_MS: -1 # keep data forever, required for tests involving fake "historical" data
KAFKA_CREATE_TOPICS: "TEST_epicsConnectionStatus:1:1,TEST_sampleEnv:1:1,TEST_writer_jobs:1:1,TEST_writer_commands:1:1,TEST_forwarderConfig:1:1,TEST_forwarderStatusLR:1:1,TEST_forwarderDataLR:1:1"
KAFKA_CREATE_TOPICS: "TEST_epicsConnectionStatus:1:1,TEST_sampleEnv:1:1,TEST_writer_jobs:1:1,TEST_writer_commands:1:1,TEST_writer_commands_alternative:1:1,TEST_forwarderConfig:1:1,TEST_forwarderStatusLR:1:1,TEST_forwarderDataLR:1:1"
depends_on:
- wait_for_zookeeper

Expand Down
33 changes: 16 additions & 17 deletions integration-tests/helpers/writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from file_writer_control import WorkerJobPool
from file_writer_control.WorkerStatus import WorkerState
from file_writer_control import JobState
from file_writer_control import CommandHandler
from file_writer_control import CommandState
from file_writer_control import WriteJob
from file_writer_control import JobHandler
Expand Down Expand Up @@ -43,37 +44,35 @@ def wait_no_working_writers(worker_pool: WorkerJobPool, timeout: float):
raise RuntimeError("Timed out when waiting for workers to finish")


def wait_start_job(worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float):
job_handler = JobHandler(worker_finder=worker_pool)
start_handler = job_handler.start_job(write_job)
def wait_command_is_done(command_handler: CommandHandler, timeout: float):
start_time = datetime.now()
try:
while not start_handler.is_done():
while not command_handler.is_done():
if start_time + timedelta(seconds=timeout) < datetime.now():
raise RuntimeError("Timed out when waiting for write job to start")
elif start_handler.get_state() == CommandState.ERROR:
raise RuntimeError("Timed out when waiting for command to finish")
elif command_handler.get_state() == CommandState.ERROR:
raise RuntimeError(
f"Got error when trying to start job. Message was: {start_handler.get_message()}"
f"Command failed. Message was: {command_handler.get_message()}"
)
time.sleep(0.5)
except RuntimeError as e:
raise RuntimeError(
e.__str__() + f" The message was: {start_handler.get_message()}"
e.__str__() + f" The message was: {command_handler.get_message()}"
)


def wait_start_job(
worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float
) -> JobHandler:
job_handler = JobHandler(worker_finder=worker_pool)
start_handler = job_handler.start_job(write_job)
wait_command_is_done(start_handler, timeout)
return job_handler


def wait_set_stop_now(job: JobHandler, timeout: float):
stop_handler = job.set_stop_time(datetime.now())
start_time = datetime.now()
while not stop_handler.is_done():
if start_time + timedelta(seconds=timeout) < datetime.now():
raise RuntimeError("Timed out when setting new stop time for job.")
elif stop_handler.get_state() == CommandState.ERROR:
raise RuntimeError(
f"Got error when trying to stop job. Message was: {stop_handler.get_message()}"
)
time.sleep(0.5)
wait_command_is_done(stop_handler, timeout)


def wait_fail_start_job(
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ h5py>=3.1.0
flatbuffers>=1.12
black==19.3b0
ess-streaming_data_types>=0.21.0
file-writer-control>=1.2.3
file-writer-control>=1.2.4
4 changes: 2 additions & 2 deletions integration-tests/stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
test_two_different_writer_modules_with_same_flatbuffer_id,
)
from test_filewriter_stop_time import test_start_and_stop_time_are_in_the_past
from test_filewriter_ignores_commands import test_ignores_commands_with_incorrect_job_id
from test_filewriter_commands import test_ignores_stop_command_with_incorrect_job_id
from test_f142_meta_data import test_f142_meta_data
from file_writer_control import WorkerJobPool

Expand All @@ -26,7 +26,7 @@ def main():
test_end_message_metadata,
test_two_different_writer_modules_with_same_flatbuffer_id,
test_start_and_stop_time_are_in_the_past,
test_ignores_commands_with_incorrect_job_id,
test_ignores_stop_command_with_incorrect_job_id,
test_f142_meta_data,
]
for func in list_of_tests:
Expand Down
170 changes: 170 additions & 0 deletions integration-tests/test_filewriter_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import time
from datetime import datetime, timedelta
from pathlib import Path
from file_writer_control.CommandStatus import CommandState
from file_writer_control.JobStatus import JobState
from file_writer_control.WriteJob import WriteJob
from helpers import full_file_path
from helpers.writer import (
wait_start_job,
wait_writers_available,
wait_no_working_writers,
wait_fail_start_job,
stop_all_jobs,
)


def test_ignores_stop_command_with_incorrect_service_id(
request,
worker_pool,
kafka_address,
multiple_writers,
):
file_path = full_file_path(f"{request.node.name}.nxs")
wait_writers_available(worker_pool, nr_of=2, timeout=20)
now = datetime.now()

with open("commands/nexus_structure.json", "r") as f:
structure = f.read()
write_job = WriteJob(
nexus_structure=structure,
file_name=file_path,
broker=kafka_address,
start_time=now,
stop_time=now + timedelta(days=30),
)
start_cmd_handler = wait_start_job(worker_pool, write_job, timeout=20)

stop_cmd_handler = worker_pool.try_send_stop_now(
"incorrect service id", write_job.job_id
)

used_timeout = timedelta(seconds=5)
stop_cmd_handler.set_timeout(used_timeout)

time.sleep(used_timeout.total_seconds() + 5)
assert (
stop_cmd_handler.get_state() == CommandState.TIMEOUT_RESPONSE
), f"Stop command not ignored. State was {stop_cmd_handler.get_state()} (cmd id: f{stop_cmd_handler.command_id})"
assert start_cmd_handler.get_state() in [
JobState.WRITING
], f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}"

stop_all_jobs(worker_pool)
wait_no_working_writers(worker_pool, timeout=15)
assert Path(file_path).is_file()


def test_ignores_stop_command_with_incorrect_job_id(
request,
worker_pool,
kafka_address,
multiple_writers,
):
file_path = full_file_path(f"{request.node.name}.nxs")
wait_writers_available(worker_pool, nr_of=2, timeout=20)
now = datetime.now()

with open("commands/nexus_structure.json", "r") as f:
structure = f.read()
write_job = WriteJob(
nexus_structure=structure,
file_name=file_path,
broker=kafka_address,
start_time=now,
stop_time=now + timedelta(days=30),
)
start_cmd_handler = wait_start_job(worker_pool, write_job, timeout=20)

cmd_handler = worker_pool.try_send_stop_now(write_job.service_id, "wrong job id")
used_timeout = timedelta(seconds=5)
cmd_handler.set_timeout(used_timeout)

time.sleep(used_timeout.total_seconds() + 5)
assert start_cmd_handler.get_state() in [
JobState.WRITING
], f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}"

stop_all_jobs(worker_pool)
wait_no_working_writers(worker_pool, timeout=0)
assert Path(file_path).is_file()


def test_accepts_stop_command_with_empty_service_id(
request,
worker_pool,
kafka_address,
multiple_writers,
):
file_path = full_file_path(f"{request.node.name}.nxs")
wait_writers_available(worker_pool, nr_of=2, timeout=20)
now = datetime.now()

with open("commands/nexus_structure.json", "r") as f:
structure = f.read()
write_job = WriteJob(
nexus_structure=structure,
file_name=file_path,
broker=kafka_address,
start_time=now,
stop_time=now + timedelta(days=30),
)
start_cmd_handler = wait_start_job(worker_pool, write_job, timeout=20)

stop_cmd_handler = worker_pool.try_send_stop_now(None, write_job.job_id)

used_timeout = timedelta(seconds=5)
stop_cmd_handler.set_timeout(used_timeout)

time.sleep(used_timeout.total_seconds() + 5)
start_job_state = start_cmd_handler.get_state()
assert start_job_state in [
JobState.DONE
], f"Start job was not stopped after Stop command. State was {start_job_state} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}"

stop_all_jobs(worker_pool)
wait_no_working_writers(worker_pool, timeout=5)
assert Path(file_path).is_file()


def test_ignores_start_command_with_incorrect_job_id(
request, worker_pool, kafka_address
):
file_path = full_file_path(f"{request.node.name}.nxs")
wait_writers_available(worker_pool, nr_of=1, timeout=10)
now = datetime.now()
with open("commands/nexus_structure.json", "r") as f:
structure = f.read()
write_job = WriteJob(
nexus_structure=structure,
file_name=file_path,
broker=kafka_address,
start_time=now,
stop_time=now + timedelta(days=30),
)
write_job.job_id = "invalid id"
wait_fail_start_job(worker_pool, write_job, timeout=20)

wait_no_working_writers(worker_pool, timeout=0)
assert not Path(file_path).is_file()


def test_reject_bad_json(request, worker_pool, kafka_address):
file_path = full_file_path(f"{request.node.name}.nxs")
wait_writers_available(worker_pool, nr_of=1, timeout=10)
now = datetime.now()
start_time = now - timedelta(seconds=10)
stop_time = now
with open("commands/nexus_structure_bad_json.json", "r") as f:
structure = f.read()
write_job = WriteJob(
nexus_structure=structure,
file_name=file_path,
broker=kafka_address,
start_time=start_time,
stop_time=stop_time,
)
fail_message = wait_fail_start_job(worker_pool, write_job, timeout=20)
assert "NeXus structure JSON" in fail_message, (
'Unexpected content in "fail to start" message. Message was: ' + fail_message
)
Loading

0 comments on commit 95fad77

Please sign in to comment.