From 53047b17da57c802ae6bd67f111b92d6c940c470 Mon Sep 17 00:00:00 2001 From: Daniel Chaffelson Date: Thu, 10 Dec 2020 22:24:35 +0000 Subject: [PATCH] fixes #229 Found that importing a versioned flow was stripping out fields added in newer versions of NiFi. Also found that older versions of registry do not support these Fields and were also being silently stripped, implemented version checks and warnings for old registry or NiFi not supporting Parameters. --- nipyapi/utils.py | 36 +++++++++++--- nipyapi/versioning.py | 33 +++++-------- resources/docker/tox-full/docker-compose.yml | 14 ++++++ tests/conftest.py | 4 +- tests/test_versioning.py | 51 ++++++++++++++++++-- 5 files changed, 105 insertions(+), 33 deletions(-) diff --git a/nipyapi/utils.py b/nipyapi/utils.py index 089c4777..49cbc84b 100644 --- a/nipyapi/utils.py +++ b/nipyapi/utils.py @@ -28,7 +28,8 @@ 'wait_to_complete', 'is_endpoint_up', 'set_endpoint', 'start_docker_containers', 'DockerContainer', 'infer_object_label_from_class', 'bypass_slash_encoding', - 'exception_handler', 'enforce_min_ver', 'check_version' + 'exception_handler', 'enforce_min_ver', 'check_version', + 'validate_parameters_versioning_support' ] log = logging.getLogger(__name__) @@ -465,7 +466,7 @@ def strip_snapshot(java_version): def check_version(base, comparator=None, service='nifi'): """ - Compares version 'a' against either version 'b', or the version of the + Compares version base against either version comparator, or the version of the currently connected service instance. Since NiFi is java, it may return a version with -SNAPSHOT as part of it. @@ -478,20 +479,34 @@ def check_version(base, comparator=None, service='nifi'): service (str): The service to test the version against, currently only supports NiFi - Returns (int): -1 if a is lower, 0 if equal, and 1 if newer + Returns (int): -1 if base is lower, 0 if equal, and 1 if newer than comparator """ assert isinstance(base, six.string_types) assert comparator is None or isinstance(comparator, six.string_types) - assert service == 'nifi' + assert service in ['nifi', 'registry'] # This call currently only supports NiFi ver_a = version.parse(base) if comparator: # if b is set, we compare the passed versions comparator = strip_snapshot(comparator) ver_b = version.parse(comparator) + elif service == 'registry': + try: + reg_swagger_def = nipyapi.registry.ApiClient().call_api( + '/swagger/swagger.json', 'GET', _preload_content=False, + auth_settings=nipyapi.config.registry_config.enabled_auth + ) + reg_json = load(reg_swagger_def[0].data) + ver_b = version.parse(reg_json['info']['version']) + except nipyapi.registry.rest.ApiException as e: + if e.status == 404: + log.warning("Unable to retrieve swagger.json from registry to check version, assuming older than 0.3") + ver_b = version.parse('0.2.0') + else: + raise else: - # if b not set, we compare a against the connected nifi instance + # Working with NiFi ver_b = version.parse(strip_snapshot(nipyapi.system.get_nifi_version_info().ni_fi_version)) if ver_b > ver_a: return -1 @@ -500,18 +515,25 @@ def check_version(base, comparator=None, service='nifi'): return 0 -def enforce_min_ver(min_version, bool_response=False): +def validate_parameters_versioning_support(): + if enforce_min_ver('1.10', bool_response=True) or enforce_min_ver('0.6', service='registry', bool_response=True): + log.warning("Connected NiFi Registry does not support Parameter Contexts and they will be lost in " + "Version Control".format()) + + +def enforce_min_ver(min_version, bool_response=False, service='nifi'): """ Raises an error if target NiFi environment is not minimum version Args: min_version (str): Version to check against bool_response (bool): If True, will return True instead of raising error + service: nifi or registry Returns: (bool) or (NotImplementedError) """ - if check_version(min_version) == 1: + if check_version(min_version, service=service) == 1: if not bool_response: raise NotImplementedError( "This function is not available " diff --git a/nipyapi/versioning.py b/nipyapi/versioning.py index da8a264d..0addbee5 100644 --- a/nipyapi/versioning.py +++ b/nipyapi/versioning.py @@ -240,36 +240,25 @@ def save_flow_ver(process_group, registry_client, bucket, flow_name=None, target_pg = nipyapi.canvas.get_process_group(process_group.id, 'id') else: target_pg = process_group - if nipyapi.utils.check_version('1.10.0') <= 0: - body = nipyapi.nifi.StartVersionControlRequestEntity( - process_group_revision=target_pg.revision, - versioned_flow=nipyapi.nifi.VersionedFlowDTO( + flow_dto = nipyapi.nifi.VersionedFlowDTO( bucket_id=bucket.identifier, comments=comment, description=desc, flow_name=flow_name, flow_id=flow_id, - registry_id=registry_client.id, - action='FORCE_COMMIT' if force else 'COMMIT' + registry_id=registry_client.id ) - ) - else: + if nipyapi.utils.check_version('1.10.0') <= 0: # no 'action' property in versions < 1.10 - body = nipyapi.nifi.StartVersionControlRequestEntity( - process_group_revision=target_pg.revision, - versioned_flow={ - 'bucketId': bucket.identifier, - 'comments': comment, - 'description': desc, - 'flowName': flow_name, - 'flowId': flow_id, - 'registryId': registry_client.id - } - ) + flow_dto.action = 'FORCE_COMMIT' if force else 'COMMIT' with nipyapi.utils.rest_exceptions(): + nipyapi.utils.validate_parameters_versioning_support() return nipyapi.nifi.VersionsApi().save_to_flow_registry( id=target_pg.id, - body=body + body=nipyapi.nifi.StartVersionControlRequestEntity( + process_group_revision=target_pg.revision, + versioned_flow=flow_dto + ) ) @@ -523,6 +512,7 @@ def create_flow_version(flow, flow_snapshot, refresh=True): for obj in [target_bucket, target_flow]: for p in bad_params: obj.__setattr__(p, None) + nipyapi.utils.validate_parameters_versioning_support() return nipyapi.registry.BucketFlowsApi().create_flow_version( bucket_id=target_bucket.identifier, flow_id=target_flow.identifier, @@ -530,6 +520,8 @@ def create_flow_version(flow, flow_snapshot, refresh=True): flow=target_flow, bucket=target_bucket, flow_contents=flow_snapshot.flow_contents, + parameter_contexts=flow_snapshot.parameter_contexts, + external_controller_services=flow_snapshot.external_controller_services, snapshot_metadata=VfsMd( version=target_flow.version_count + 1, comments=flow_snapshot.snapshot_metadata.comments, @@ -689,6 +681,7 @@ def import_flow_version(bucket_id, encoded_flow=None, file_path=None, " add this version to, or flow_name must be a unique " "name for a flow in this bucket, but not both") # Now write the new version + nipyapi.utils.validate_parameters_versioning_support() return create_flow_version( flow=ver_flow, flow_snapshot=imported_flow, diff --git a/resources/docker/tox-full/docker-compose.yml b/resources/docker/tox-full/docker-compose.yml index 2c60fc41..6dda53be 100644 --- a/resources/docker/tox-full/docker-compose.yml +++ b/resources/docker/tox-full/docker-compose.yml @@ -30,6 +30,12 @@ services: hostname: nifi ports: - "8080:8080" + nifidev: + image: apache/nifi:1.12.1 + container_name: nifidev + hostname: nifidev + ports: + - "8081:8080" registry-010: image: apache/nifi-registry:0.1.0 container_name: registry-010 @@ -54,3 +60,11 @@ services: - "18080:18080" environment: - NIFI_REGISTRY_WEB_HTTP_PORT=18080 + registrydev: + image: apache/nifi-registry:0.7.0 + container_name: registrydev + hostname: registrydev + ports: + - "18081:18081" + environment: + - NIFI_REGISTRY_WEB_HTTP_PORT=18081 diff --git a/tests/conftest.py b/tests/conftest.py index d6b466ed..1588a4c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -594,8 +594,8 @@ def fixture_flow_serde(request, tmpdir, fix_ver_flow): 'FixtureFlowSerde', getattr(fix_ver_flow, '_fields') + ('filepath', 'json', 'yaml', 'raw') ) - f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir)\ - .join(test_ver_export_filename)) + f_filepath = str(tmpdir.mkdir(test_ver_export_tmpdir) + .join(test_ver_export_filename)) f_raw = nipyapi.versioning.get_flow_version( bucket_id=fix_ver_flow.bucket.identifier, flow_id=fix_ver_flow.flow.identifier, diff --git a/tests/test_versioning.py b/tests/test_versioning.py index 02a8b215..a950de0b 100644 --- a/tests/test_versioning.py +++ b/tests/test_versioning.py @@ -7,7 +7,7 @@ import pytest from deepdiff import DeepDiff from tests import conftest -from nipyapi import registry, nifi, versioning, canvas, utils, config +from nipyapi import registry, nifi, versioning, canvas, utils, config, parameters def test_create_registry_client(regress_flow_reg): @@ -139,7 +139,7 @@ def test_save_flow_ver(regress_flow_reg, fix_bucket, fix_pg, fix_proc): ) assert isinstance(r2, nifi.VersionControlInformationEntity) assert r2.version_control_information.version > \ - r1.version_control_information.version + r1.version_control_information.version with pytest.raises(ValueError): _ = versioning.save_flow_ver( process_group=f_pg, @@ -188,7 +188,7 @@ def test_get_flow_in_bucket(regress_flow_reg, fix_ver_flow): 'id' ) assert isinstance(r1, registry.VersionedFlow) - assert r1.identifier == fix_ver_flow.info.version_control_information.\ + assert r1.identifier == fix_ver_flow.info.version_control_information. \ flow_id r2 = versioning.get_flow_in_bucket(fix_ver_flow.bucket.identifier, 'fakenews', 'id') @@ -394,10 +394,53 @@ def test_import_flow_version(regress_flow_reg, fix_flow_serde): ) == {} +def test_issue_229(regress_flow_reg, fix_bucket, fix_pg, fix_context): + # test we can deploy and imported flow, issue 229 + if utils.enforce_min_ver('1.10.0', bool_response=True) or utils.enforce_min_ver('0.6.0', service='registry', + bool_response=True): + pass + else: + reg_client = conftest.ensure_registry_client(config.registry_local_name) + bucket = fix_bucket() + pg = fix_pg.generate() + context = fix_context.generate() + parameters.assign_context_to_process_group(pg, context.id) + save_flow_ver = versioning.save_flow_ver( + process_group=pg, + registry_client=reg_client, + bucket=bucket, + flow_name=conftest.test_versioned_flow_name, + comment='NiPyApi Test', + desc='NiPyApi Test' + ) + flow_raw = versioning.get_flow_version( + bucket_id=bucket.identifier, + flow_id=save_flow_ver.version_control_information.flow_id, + export=True + ) + # Check that it is being exported correctly + # Older versions of Registry will drop unsupported parameterContext information + if 'parameterContexts' in utils.load(flow_raw).keys(): + imported_flow = versioning.import_flow_version( + bucket_id=bucket.identifier, + encoded_flow=flow_raw, + flow_name=conftest.test_versioned_flow_name + '_229' + ) + deployed_flow = versioning.deploy_flow_version( + parent_id=canvas.get_root_pg_id(), + location=(0, 0), + bucket_id=bucket.identifier, + flow_id=imported_flow.flow.identifier, + reg_client_id=reg_client.id, + version=None + ) + assert isinstance(deployed_flow, nifi.ProcessGroupEntity) + + def test_deploy_flow_version(regress_flow_reg, fix_ver_flow): r1 = versioning.deploy_flow_version( parent_id=canvas.get_root_pg_id(), - location=(0,0), + location=(0, 0), bucket_id=fix_ver_flow.bucket.identifier, flow_id=fix_ver_flow.flow.identifier, reg_client_id=fix_ver_flow.client.id,