Skip to content

Commit

Permalink
fixes #229
Browse files Browse the repository at this point in the history
Found that importing a versioned flow was stripping out fields added in newer versions of NiFi.
Also found that older versions of registry do not support these Fields and were also being silently stripped, implemented version checks and warnings for old registry or NiFi not supporting Parameters.
  • Loading branch information
Chaffelson committed Dec 10, 2020
1 parent 39c0723 commit 53047b1
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 33 deletions.
36 changes: 29 additions & 7 deletions nipyapi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
'wait_to_complete', 'is_endpoint_up', 'set_endpoint',
'start_docker_containers', 'DockerContainer',
'infer_object_label_from_class', 'bypass_slash_encoding',
'exception_handler', 'enforce_min_ver', 'check_version'
'exception_handler', 'enforce_min_ver', 'check_version',
'validate_parameters_versioning_support'
]

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -465,7 +466,7 @@ def strip_snapshot(java_version):

def check_version(base, comparator=None, service='nifi'):
"""
Compares version 'a' against either version 'b', or the version of the
Compares version base against either version comparator, or the version of the
currently connected service instance.
Since NiFi is java, it may return a version with -SNAPSHOT as part of it.
Expand All @@ -478,20 +479,34 @@ def check_version(base, comparator=None, service='nifi'):
service (str): The service to test the version against, currently
only supports NiFi
Returns (int): -1 if a is lower, 0 if equal, and 1 if newer
Returns (int): -1 if base is lower, 0 if equal, and 1 if newer than comparator
"""
assert isinstance(base, six.string_types)
assert comparator is None or isinstance(comparator, six.string_types)
assert service == 'nifi'
assert service in ['nifi', 'registry']
# This call currently only supports NiFi
ver_a = version.parse(base)
if comparator:
# if b is set, we compare the passed versions
comparator = strip_snapshot(comparator)
ver_b = version.parse(comparator)
elif service == 'registry':
try:
reg_swagger_def = nipyapi.registry.ApiClient().call_api(
'/swagger/swagger.json', 'GET', _preload_content=False,
auth_settings=nipyapi.config.registry_config.enabled_auth
)
reg_json = load(reg_swagger_def[0].data)
ver_b = version.parse(reg_json['info']['version'])
except nipyapi.registry.rest.ApiException as e:
if e.status == 404:
log.warning("Unable to retrieve swagger.json from registry to check version, assuming older than 0.3")
ver_b = version.parse('0.2.0')
else:
raise
else:
# if b not set, we compare a against the connected nifi instance
# Working with NiFi
ver_b = version.parse(strip_snapshot(nipyapi.system.get_nifi_version_info().ni_fi_version))
if ver_b > ver_a:
return -1
Expand All @@ -500,18 +515,25 @@ def check_version(base, comparator=None, service='nifi'):
return 0


def enforce_min_ver(min_version, bool_response=False):
def validate_parameters_versioning_support():
if enforce_min_ver('1.10', bool_response=True) or enforce_min_ver('0.6', service='registry', bool_response=True):
log.warning("Connected NiFi Registry does not support Parameter Contexts and they will be lost in "
"Version Control".format())


def enforce_min_ver(min_version, bool_response=False, service='nifi'):
"""
Raises an error if target NiFi environment is not minimum version
Args:
min_version (str): Version to check against
bool_response (bool): If True, will return True instead of
raising error
service: nifi or registry
Returns:
(bool) or (NotImplementedError)
"""
if check_version(min_version) == 1:
if check_version(min_version, service=service) == 1:
if not bool_response:
raise NotImplementedError(
"This function is not available "
Expand Down
33 changes: 13 additions & 20 deletions nipyapi/versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,36 +240,25 @@ def save_flow_ver(process_group, registry_client, bucket, flow_name=None,
target_pg = nipyapi.canvas.get_process_group(process_group.id, 'id')
else:
target_pg = process_group
if nipyapi.utils.check_version('1.10.0') <= 0:
body = nipyapi.nifi.StartVersionControlRequestEntity(
process_group_revision=target_pg.revision,
versioned_flow=nipyapi.nifi.VersionedFlowDTO(
flow_dto = nipyapi.nifi.VersionedFlowDTO(
bucket_id=bucket.identifier,
comments=comment,
description=desc,
flow_name=flow_name,
flow_id=flow_id,
registry_id=registry_client.id,
action='FORCE_COMMIT' if force else 'COMMIT'
registry_id=registry_client.id
)
)
else:
if nipyapi.utils.check_version('1.10.0') <= 0:
# no 'action' property in versions < 1.10
body = nipyapi.nifi.StartVersionControlRequestEntity(
process_group_revision=target_pg.revision,
versioned_flow={
'bucketId': bucket.identifier,
'comments': comment,
'description': desc,
'flowName': flow_name,
'flowId': flow_id,
'registryId': registry_client.id
}
)
flow_dto.action = 'FORCE_COMMIT' if force else 'COMMIT'
with nipyapi.utils.rest_exceptions():
nipyapi.utils.validate_parameters_versioning_support()
return nipyapi.nifi.VersionsApi().save_to_flow_registry(
id=target_pg.id,
body=body
body=nipyapi.nifi.StartVersionControlRequestEntity(
process_group_revision=target_pg.revision,
versioned_flow=flow_dto
)
)


Expand Down Expand Up @@ -523,13 +512,16 @@ def create_flow_version(flow, flow_snapshot, refresh=True):
for obj in [target_bucket, target_flow]:
for p in bad_params:
obj.__setattr__(p, None)
nipyapi.utils.validate_parameters_versioning_support()
return nipyapi.registry.BucketFlowsApi().create_flow_version(
bucket_id=target_bucket.identifier,
flow_id=target_flow.identifier,
body=nipyapi.registry.VersionedFlowSnapshot(
flow=target_flow,
bucket=target_bucket,
flow_contents=flow_snapshot.flow_contents,
parameter_contexts=flow_snapshot.parameter_contexts,
external_controller_services=flow_snapshot.external_controller_services,
snapshot_metadata=VfsMd(
version=target_flow.version_count + 1,
comments=flow_snapshot.snapshot_metadata.comments,
Expand Down Expand Up @@ -689,6 +681,7 @@ def import_flow_version(bucket_id, encoded_flow=None, file_path=None,
" add this version to, or flow_name must be a unique "
"name for a flow in this bucket, but not both")
# Now write the new version
nipyapi.utils.validate_parameters_versioning_support()
return create_flow_version(
flow=ver_flow,
flow_snapshot=imported_flow,
Expand Down
14 changes: 14 additions & 0 deletions resources/docker/tox-full/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ services:
hostname: nifi
ports:
- "8080:8080"
nifidev:
image: apache/nifi:1.12.1
container_name: nifidev
hostname: nifidev
ports:
- "8081:8080"
registry-010:
image: apache/nifi-registry:0.1.0
container_name: registry-010
Expand All @@ -54,3 +60,11 @@ services:
- "18080:18080"
environment:
- NIFI_REGISTRY_WEB_HTTP_PORT=18080
registrydev:
image: apache/nifi-registry:0.7.0
container_name: registrydev
hostname: registrydev
ports:
- "18081:18081"
environment:
- NIFI_REGISTRY_WEB_HTTP_PORT=18081
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ def fixture_flow_serde(request, tmpdir, fix_ver_flow):
'FixtureFlowSerde',
getattr(fix_ver_flow, '_fields') + ('filepath', 'json', 'yaml', 'raw')
)
f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)\
.join(test_ver_export_filename))
f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)
.join(test_ver_export_filename))
f_raw = nipyapi.versioning.get_flow_version(
bucket_id=fix_ver_flow.bucket.identifier,
flow_id=fix_ver_flow.flow.identifier,
Expand Down
51 changes: 47 additions & 4 deletions tests/test_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from deepdiff import DeepDiff
from tests import conftest
from nipyapi import registry, nifi, versioning, canvas, utils, config
from nipyapi import registry, nifi, versioning, canvas, utils, config, parameters


def test_create_registry_client(regress_flow_reg):
Expand Down Expand Up @@ -139,7 +139,7 @@ def test_save_flow_ver(regress_flow_reg, fix_bucket, fix_pg, fix_proc):
)
assert isinstance(r2, nifi.VersionControlInformationEntity)
assert r2.version_control_information.version > \
r1.version_control_information.version
r1.version_control_information.version
with pytest.raises(ValueError):
_ = versioning.save_flow_ver(
process_group=f_pg,
Expand Down Expand Up @@ -188,7 +188,7 @@ def test_get_flow_in_bucket(regress_flow_reg, fix_ver_flow):
'id'
)
assert isinstance(r1, registry.VersionedFlow)
assert r1.identifier == fix_ver_flow.info.version_control_information.\
assert r1.identifier == fix_ver_flow.info.version_control_information. \
flow_id
r2 = versioning.get_flow_in_bucket(fix_ver_flow.bucket.identifier,
'fakenews', 'id')
Expand Down Expand Up @@ -394,10 +394,53 @@ def test_import_flow_version(regress_flow_reg, fix_flow_serde):
) == {}


def test_issue_229(regress_flow_reg, fix_bucket, fix_pg, fix_context):
# test we can deploy and imported flow, issue 229
if utils.enforce_min_ver('1.10.0', bool_response=True) or utils.enforce_min_ver('0.6.0', service='registry',
bool_response=True):
pass
else:
reg_client = conftest.ensure_registry_client(config.registry_local_name)
bucket = fix_bucket()
pg = fix_pg.generate()
context = fix_context.generate()
parameters.assign_context_to_process_group(pg, context.id)
save_flow_ver = versioning.save_flow_ver(
process_group=pg,
registry_client=reg_client,
bucket=bucket,
flow_name=conftest.test_versioned_flow_name,
comment='NiPyApi Test',
desc='NiPyApi Test'
)
flow_raw = versioning.get_flow_version(
bucket_id=bucket.identifier,
flow_id=save_flow_ver.version_control_information.flow_id,
export=True
)
# Check that it is being exported correctly
# Older versions of Registry will drop unsupported parameterContext information
if 'parameterContexts' in utils.load(flow_raw).keys():
imported_flow = versioning.import_flow_version(
bucket_id=bucket.identifier,
encoded_flow=flow_raw,
flow_name=conftest.test_versioned_flow_name + '_229'
)
deployed_flow = versioning.deploy_flow_version(
parent_id=canvas.get_root_pg_id(),
location=(0, 0),
bucket_id=bucket.identifier,
flow_id=imported_flow.flow.identifier,
reg_client_id=reg_client.id,
version=None
)
assert isinstance(deployed_flow, nifi.ProcessGroupEntity)


def test_deploy_flow_version(regress_flow_reg, fix_ver_flow):
r1 = versioning.deploy_flow_version(
parent_id=canvas.get_root_pg_id(),
location=(0,0),
location=(0, 0),
bucket_id=fix_ver_flow.bucket.identifier,
flow_id=fix_ver_flow.flow.identifier,
reg_client_id=fix_ver_flow.client.id,
Expand Down

0 comments on commit 53047b1

Please sign in to comment.