diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 3a9d6e10ea8d42..6ae50215c8166f 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -149,6 +149,25 @@ module.exports = { type: "doc", id: "docs/glossary/business-glossary", }, + { + label: "Compliance Forms", + type: "category", + collapsed: true, + items: [ + { + type: "doc", + id: "docs/features/feature-guides/compliance-forms/overview", + }, + { + type: "doc", + id: "docs/features/feature-guides/compliance-forms/create-a-form", + }, + { + type: "doc", + id: "docs/features/feature-guides/compliance-forms/complete-a-form", + }, + ], + }, { label: "Data Contract", type: "doc", @@ -164,7 +183,6 @@ module.exports = { type: "doc", id: "docs/features/dataset-usage-and-query-history", }, - "docs/features/feature-guides/documentation-forms", { label: "Domains", type: "doc", diff --git a/docs/api/tutorials/forms.md b/docs/api/tutorials/forms.md index cf51f1579f1c8a..30dd4db7d8f111 100644 --- a/docs/api/tutorials/forms.md +++ b/docs/api/tutorials/forms.md @@ -1,13 +1,15 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -# Documentation Forms +# Compliance Forms -## Why Would You Use Documentation Forms? +## Why Would You Use Compliance Forms? -Documentation Forms are a way for end-users to fill out all mandatory attributes associated with a data asset. The form will be dynamically generated based on the definitions provided by administrators and stewards and matching rules. +**DataHub Compliance Forms** streamline the process of documenting, annotating, and classifying your most critical Data Assets through a collaborative, crowdsourced approach. -Learn more about forms in the [Documentation Forms Feature Guide](../../../docs/features/feature-guides/documentation-forms.md). +With Compliance Forms, you can execute large-scale compliance initiatives by assigning tasks (e.g., documentation, tagging, or classification requirements) to the appropriate stakeholders — data owners, stewards, and subject matter experts. + +Learn more about forms in the [Compliance Forms Feature Guide](../../../docs/features/feature-guides/compliance-forms/overview.md). ### Goal Of This Guide This guide will show you how to diff --git a/docs/features/feature-guides/compliance-forms/complete-a-form.md b/docs/features/feature-guides/compliance-forms/complete-a-form.md new file mode 100644 index 00000000000000..285c722179e4d7 --- /dev/null +++ b/docs/features/feature-guides/compliance-forms/complete-a-form.md @@ -0,0 +1,177 @@ +--- +title: Complete a Form +--- + +import FeatureAvailability from '@site/src/components/FeatureAvailability'; + +# Complete a DataHub Compliance Form + + +Once a Compliance Form has been published (see [Create a Compliance Form](create-a-form.md)), Assignees will receive notifications in their Task Center prompting them to complete the Form for each Asset they are responsible for. + +This guide provides an example of completing a Compliance Form, covering: + +1. Accessing a Form from an Asset Page or the Task Center +2. Completing a Form for a single Asset or multiple Assets (DataHub Cloud only) +3. Understanding different Form Question completion states + +The example uses the **Governance Initiative 2024**, a Verification Form with 3 Required Questions: + +

+ Sample Compliance Form +

+ +## Access a Compliance Form + +Once you have been assigned to complete a Compliance Form, you will see a **Complete Documentation** or **Complete Verification** option on the right-hand side of an Asset Page: + +

+ Open Compliance Form from Asset Page +

+ +**DataHub Cloud** users can find all outstanding Compliance Form requests by navigating to the **Task Center**: + +

+ Open Compliance Form from Task Center +

+ +## Complete a Form for a Single Asset + +When filling out a Compliance Form for a single Asset, you'll see a list of Questions tailored to that Asset, with clear labels showing which ones are required. Here's how it works: + +- **Question Details:** Each Question specifies if it's required or optional. Required Questions must be completed to submit the Form. +- **Pre-Populated Metadata:** If metadata already exists for a Question, it will appear pre-filled. You can confirm the existing value or make updates as needed. +- **Assignee Contributions:** If another Assignee has already provided a response, their name and the time of submission will be displayed. This gives you visibility into previous input, though you can still update the response. + +:::tip +For Verification Forms, after addressing all required Questions, you'll be prompted to provide final sign-off. This ensures all responses are complete and accurate, marking the Form ready for submission. +::: + +Once you complete all required responses, the sidebar will update with the status of the Asset: + +- **Documented**: All required Questions are completed, Verification is not needed +- **Verified**: All required Questions are completed and Verified + +Here's what the **Governance Initiative 2024** Verification Form looks like for `dogs_in_movies` after responding to all Required Questions: + +

+ Asset Ready to Verify +

+ +And here's the `dogs_in_movies` sidebar after Verifying all responses: + +

+ Asset is Verified +

+ +### Navigate to the Next Asset + +To continue working through the Compliance Forms assigned to you, **use the navigation arrows located in the top-right corner**. These arrows will take you to the next Asset that is still pending Form completion or Verification. Only Assets that require action will appear in this flow, allowing you to focus on the remaining tasks without unnecessary steps. + +## Complete a Form Question for Multiple Assets + +When you want to provide the same response for a question to multiple assets, you can apply it in bulk by selecting the **By Question** option in the top-right corner. This allows you to navigate through the Form question-by-question and apply the same response to multiple assets. + +:::note +Completing Form Questions for multiple Assets is only supported for DataHub Cloud. +::: + +### Example: Apply a Response in Bulk + +Let's look at an example. Imagine we are trying to provide the same answer to a Question for all Assets in a Snowflake schema called `DEMO_DB`. Here's how we'd do it: + +1. **Filter Assets**: Filter down to all datasets in the `DEMO_DB` Snowflake schema. +2. **Set a Response**: For the selected Question, provide a response. In this case, we'll set the Deletion Date to be `2024-12-31`. +3. **Apply to All Selected Assets**: Use the bulk application feature to apply this response to all filtered Assets. + +

+ Apply Response to Multiple Assets +

+ +After setting the response, toggle through each Question, providing the necessary responses to combinations of Assets. + +### Verification for Multiple Assets + +For Verification Forms, as you complete Questions, you will see the number of assets eligible for Verification in the top-right corner. This makes it easy to track which Assets have met the requirements. + +

+ Multiple Assets ready to Verify +

+ +When you are ready to bulk Verify Assets, you will be prompted to confirm that all responses are complete and accurate before proceeding. + +

+ Final Bulk Verification +

+ +### Switch Between Completion Modes + +You can easily toggle between the **Complete By Asset** and **Complete By Question** views as needed, ensuring flexibility while completing and verifying the Compliance Forms. + +## Understanding Different Form Question Completion States + +When completing a Compliance Form, you may encounter various types of Questions, each with unique completion states based on existing metadata or prior responses from other Assignees. This section highlights examples of various completion states to help you understand how Questions can be answered, confirmed, or updated when completing a Form. + +**_1. What is the primary use case for this asset?_** + +This required Question is asking the Assignee to provide Documentation on how the Asset should be used. Note that there is no text populated in the description, meaning the Asset does not have any documentation at all. + +

+ Sample Compliance Form +

+ +**_2. When will this asset be deleted?_** + +You may notice that this question has a pre-populated value. When metadata has been populated from a source _outside_ of a Form, users will have the option to update and save the value, or, simply **Confirm** that the value is accurate. + +

+ Sample Compliance Form +

+ +**_3. Who is the Data Steward of this Asset?_** + +Here's an example where a different Form Assignee has already provided an answer through the Compliance Form 3 days ago. All Assignees will still have the option to update the response, but this allows users to see how other Form Assignees have already answered the questions. + +

+ Sample Compliance Form +

+ + +## FAQ and Troubleshooting + +**Why don’t I see any Compliance Forms in the Task Center or on an Asset Page?** + +If you don’t see any Compliance Forms, check with the Form author to ensure your DataHub user account has been assigned to complete a Form for one or more Assets. Forms can be assigned to Asset Owners, specific DataHub Users, or a combination of both. \ No newline at end of file diff --git a/docs/features/feature-guides/compliance-forms/create-a-form.md b/docs/features/feature-guides/compliance-forms/create-a-form.md new file mode 100644 index 00000000000000..e97aaaa581777d --- /dev/null +++ b/docs/features/feature-guides/compliance-forms/create-a-form.md @@ -0,0 +1,186 @@ +--- +title: Create a Form +--- + +import FeatureAvailability from '@site/src/components/FeatureAvailability'; + +# Create a DataHub Compliance Form + + +This guide will walk you through creating and assigning Compliance Forms, including: + +1. Creating a new Compliance Form +2. Building **Questions** for the Compliance Form +3. Assigning **Assets** for the Compliance Form +4. Selecting **Assignees** for the Compliance Form +5. Publishing a Compliance Form + +:::note +Managing Compliance Forms via the DataHub UI is only available in DataHub Cloud. If you are using DataHub Core, please refer to the [Compliance Forms API Guide](../../../api/tutorials/forms.md). +::: + +### Prerequisites + +In order to create, edit, or remove Compliance Forms, you must have the **Manage Compliance Forms** Platform privilege. + +### Step 1: Create a new Compliance Form + +From the navigation bar, head to **Govern** > **Compliance Forms**. Click **+ Create** to start building your Form. + +

+ View of all Compliance Forms +

+ +First up, provide the following details: + +1. **Name:** Select a unique and descriptive name for your Compliance Form that clearly communicates its purpose, such as **"PII Certification Q4 2024"**. + + _**Pro Tip:** This name will be displayed to Assignees when they are assigned tasks, so make it clear and detailed to ensure it conveys the intent of the Form effectively._ + +2. **Description:** Craft a concise yet informative description that explains the purpose of the Compliance Form. Include key details such as the importance of the initiative, its objectives, and the expected completion timeline. This helps Assignees understand the context and significance of their role in the process. + + _**Example:** "This Compliance Form is designed to ensure all datasets containing PII are reviewed and verified by Q4 2024. Completing this Form is critical for compliance with organizational and regulatory requirements."_ + +3. **Type:** Specify the collection type for the Form, based on your compliance requirements: + - **Completion:** The Form is considered complete once all required questions are answered for the selected Assets. We recommend this option for basic requirement completion use cases. + + - **Verification:** The Form is considered complete only when all required questions are answered for the selected Assets **and** an Assignee has explicitly "verified" the responses. We recommend this option when final sign-off by Assignees is necessary, ensuring they acknowledge the accuracy and validity of their responses. + +4. Next, click **Add Question** to begin building the requirements for your Form. + +

+ Create a new Compliance Form +

+ +### Step 2: Build Questions for your Form + +Next, define the Questions for your Compliance Forms. These are used to collect required information about selected assets, and must be completed by an Assignee in order for the Form to be considered complete. + +There are 5 different question types to choose from: + +* **Ownership:** Request one or more owners to be assigned to selected assets. Optionally restrict responses to a specific set of valid users, groups, and ownership types. + * _E.g. Who is responsible for ensuring the accuracy of this Dataset?_ +* **Domain:** Assign a Domain to the Asset, with the option to predefine the set of allowed Domains. + * _E.g. Which Domain does this Dashboard belong to? Sales, Marketing, Finance._ +* **Documentation:** Provide Documentation about the Asset and/or Column. + * _E.g. What is the primary use case of this Dataset? What caveats should others be aware of?_ +* **Glossary Terms:** Assign one or more Glossary Term to the Asset and/or Column, with the option to predefine the set of allowed Glossary Terms. + * _E.g. What types of personally identifiable information (PII) are included in this Asset? Email, Address, SSN, etc._ +* **Structured Properties:** Apply custom properties to an Asset and/or Column. + * _E.g. What date will this Dataset be deprecated and deleted?_ + +When creating a Question, use a clear and concise Title that is easy for Assignees to understand. In the Description, include additional context or instructions to guide their responses. Both the Title and Description will be visible to Assignees when completing the Form, so make sure to provide any specific hints or details they may need to answer the Question accurately and confidently. + +

+ Create a new Compliance Form prompt +

+ +### Step 3: Assign Assets to your Compliance Form + +Now that you have defined the Questions you want Assignees to complete, it's now time to assign the in-scope Assets for this exercise. + +In the **Assign Assets** section, you can easily target the specific set of Assets that are relevant for this Form with the following steps: + +1. Add a Condition or Group of Conditions +2. Choose the appropriate filter type, such as: + * Asset Type (Dataset, Chart, etc.) + * Platform (Snowflake, dbt, etc.) + * Domain (Sales, Marketing, Finance, etc.) + * Assigned Owners + * Assigned Glossary Terms +3. Decide between **All**, **Any**, or **None** of the filters should apply +4. Preview the relevant Assets to confirm you have applied the appropriate filters + +For example, you can apply filters to focus on all **Snowflake Datasets** that are also associated with the **Finance Domain**. This allows you to break down your compliance initiatives into manageable chunks, so you don't have to go after your entire data ecosystem in one go. + +

+ Assign assets to a Compliance Form +

+ +### Step 4: Select Assignees to complete your Compliance Form + +With the Questions and assigned Assets defined, the next step is to select the Assignees—the Users and/or Groups responsible for completing the Form. + +In the **Add Recipients** section, decide who is responsible for completing the Form: + +* **Asset Owners:** Any User that is assigned to one of the in-scope Assets will be able to complete the Form. This is useful for larger initiatives when you may not know the full set of Users. +* **Specific Users and/or Groups:** Select a specific set of Users and/or Groups within DataHub. This is useful when Ownership of the Assets may be poorly-defined. + +

+ Assign recipients to a Compliance Form +

+ +### Step 5: Publish your Form + +After defining the Questions, assigning Assets, and selecting the Assignees, your Form is ready to be published. Once published, Assignees will be notified to complete the Form for the Assets they are responsible for. + + +To publish a Form, simply click **Publish**. + +:::caution +Once you have published a Form, you **cannot** change or add Questions. You can, however, change the set of Assets and/or Assignees for the Form. +::: + +Not ready for primetime just yet? No worries! You also have the option to **Save Draft**. + +

+ Publish a Compliance Form +

+ +## FAQ and Troubleshooting + +**Does answering a Compliance Form Question update the selected Asset?** + +Yes! Compliance Forms serve as a powerful tool for gathering and updating key attributes for your mission-critical Data Assets at scale. When a Question is answered, the response directly updates the corresponding attributes of the selected Asset. + +**How does a Compliance Form interact with existing metadata?** + +If an Asset already has existing metadata that is also referenced in a Form Question, Assignees will have the option to confirm the existing value, overwrite the value, or append additional details. + +_You can find more details and examples in the [Complete a Form](complete-a-form.md#understanding-different-form-question-completion-states) guide._ + +**What is the difference between Completion and Verification Forms?** + +Both Form types are a way to configure a set of optional and/or required Questions for DataHub users to complete. When using Verification Forms, users will be presented with a final verification step once all required questions have been completed; you can think of this as a final acknowledgment of the accuracy of information submitted. + +**Can I assign multiple Forms to a single Asset?** + +You sure can! Please keep in mind that an Asset will only be considered Documented or Verified if all required questions are completed on all assigned Forms. + +**How will DataHub Users know that a Compliance Form has been assigned to them?** + +They have to check the Inbox on the navigation bar. There are no off-platform notifications for Compliance Forms at this time. + +**How do I track the progress of Form completion?** + +Great question. We are working on Compliance Forms Analytics that will directly show you the progress of your initiative across the selected Assets. Stay tuned! + +### API Tutorials + +- [API Guides on Documentation Form](../../../api/tutorials/forms.md) + +### Related Features + +- [DataHub Properties](../../feature-guides/properties.md) + +## Next Steps + +Now that you have created a DataHub Compliance Form, you're ready to [Complete a Compliance Form](complete-a-form.md). \ No newline at end of file diff --git a/docs/features/feature-guides/compliance-forms/overview.md b/docs/features/feature-guides/compliance-forms/overview.md new file mode 100644 index 00000000000000..86a6d8cc6dadfb --- /dev/null +++ b/docs/features/feature-guides/compliance-forms/overview.md @@ -0,0 +1,46 @@ +--- +title: Overview +--- + +import FeatureAvailability from '@site/src/components/FeatureAvailability'; + +# About DataHub Compliance Forms + + +**DataHub Compliance Forms** streamline the process of documenting, annotating, and classifying your most critical Data Assets through a collaborative, crowdsourced approach. + +With Compliance Forms, you can execute large-scale compliance initiatives by assigning tasks (e.g., documentation, tagging, or classification requirements) to the appropriate stakeholders — data owners, stewards, and subject matter experts. + +## What are Compliance Forms? + +A **Compliance Form** is a flexible and centrally managed tool that enables your data governance or compliance teams to define, enforce, and monitor requirements for specific Data Assets or Columns. + +A Compliance Form consists of: + +1. **Assets:** The Data Assets or Columns for which the Form must be completed. These represent the scope of the compliance initiative. + +2. **Questions:** The set of requirements or conditions that must be completed for each asset. Questions are a vehicle to collect key attributes for your data assets. These can range from simple to complex, with questions that require differing types of answers to complete. Examples include Descriptions, Domains, Owners, Tags, Glossary Terms, and custom Structured Properties. + +3. **Assignees:** The users or groups responsible for completing the Form (e.g., asset owners, domain experts, or stewards). + +Once a Compliance Form is defined, it can be published. When a form is published, the assignees who are required to complete the requirements will be notified via the Inbox of the tasks that they must complete. In addition, analytics will begin to be gathered about the assets that are meeting or violating the requirements in the form so you can understand your initiative's progress over time. + +### Why Use Compliance Forms? + +Compliance Forms enable organizations to: +- Standardize documentation and metadata across critical Data Assets. +- Crowdsource compliance-related tasks to domain experts who are best equipped to provide accurate information. +- Scale governance initiatives efficiently while maintaining accuracy and accountability. + +By leveraging Compliance Forms, organizations can ensure consistent metadata quality and foster collaboration between data experts and governance teams. + +

+ Sample Compliance Form +

+ +## Next Steps + +Now that you understand the basics of DataHub Compliance Forms, you're ready to [Create a Compliance Form](create-a-form.md). \ No newline at end of file diff --git a/docs/features/feature-guides/documentation-forms.md b/docs/features/feature-guides/documentation-forms.md deleted file mode 100644 index 2edeb8ce302d77..00000000000000 --- a/docs/features/feature-guides/documentation-forms.md +++ /dev/null @@ -1,113 +0,0 @@ -import FeatureAvailability from '@site/src/components/FeatureAvailability'; - -# About DataHub Documentation Forms - - -DataHub Documentation Forms streamline the process of setting documentation requirements and delegating annotation responsibilities to the relevant data asset owners, stewards, and subject matter experts. - -Forms are highly configurable, making it easy to ask the right questions of the right people, for a specific set of assets. - -## What are Documentation Forms? - -You can think of Documentation Forms as a survey for your data assets: a set of questions that must be answered in order for an asset to be considered properly documented. - -Verification Forms are an extension of Documentation Forms, requiring a final verification, or sign-off, on all responses before the asset can be considered Verified. This is useful for compliance and/or governance annotation initiatives where you want assignees to provide a final acknowledgement that the information provided is correct. - -## Creating and Assigning Documentation Forms - -Documentation Forms are defined via YAML with the following details: - -- Name and Description to help end-users understand the scope and use case -- Form Type, either Documentation or Verification - - Verification Forms require a final signoff, i.e. Verification, of all required questions before the Form can be considered complete -- Form Questions (aka "prompts") for end-users to complete - - Questions can be assigned at the asset-level and/or the field-level - - Asset-level questions can be configured to be required; by default, all questions are optional -- Assigned Assets, defined by: - - A set of specific asset URNs, OR - - Assets related to a set of filters, such as Type (Datasets, Dashboards, etc.), Platform (Snowflake, Looker, etc.), Domain (Product, Marketing, etc.), or Container (Schema, Folder, etc.) -- Optional: Form Assignees - - Optionally assign specific DataHub users/groups to complete the Form for all relevant assets - - If omitted, any Owner of an Asset can complete Forms assigned to that Asset - -Here's an example of defining a Documentation Form via YAML: -```yaml -- id: 123456 - # urn: "urn:li:form:123456" # optional if id is provided - type: VERIFICATION # Supported Types: DOCUMENTATION, VERIFICATION - name: "Metadata Initiative 2024" - description: "How we want to ensure the most important data assets in our organization have all of the most important and expected pieces of metadata filled out" - prompts: # Questions for Form assignees to complete - - id: "123" - title: "Data Retention Time" - description: "Apply Retention Time structured property to form" - type: STRUCTURED_PROPERTY - structured_property_id: io.acryl.privacy.retentionTime - required: True # optional; default value is False - entities: # Either pass a list of urns or a group of filters. This example shows a list of urns - urns: - - urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) - # optionally assign the form to a specific set of users and/or groups - # when omitted, form will be assigned to Asset owners - actors: - users: - - urn:li:corpuser:jane@email.com # note: these should be URNs - - urn:li:corpuser:john@email.com - groups: - - urn:li:corpGroup:team@email.com # note: these should be URNs - -``` - -:::note -Documentation Forms currently only support defining Structured Properties as Form Questions -::: - - - - - -## Additional Resources - -### Videos - -**Asset Verification in DataHub Cloud** - -

- -

- -## FAQ and Troubleshooting - -**What is the difference between Documentation and Verification Forms?** - -Both form types are a way to configure a set of optional and/or required questions for DataHub users to complete. When using Verification Forms, users will be presented with a final verification step once all required questions have been completed; you can think of this as a final acknowledgement of the accuracy of information submitted. - -**Who is able to complete Forms in DataHub?** - -By default, any owner of an Asset will be able to respond to questions assigned via a Form. - -When assigning a Form to an Asset, you can optionally assign specific DataHub users/groups to fill them out. - -**Can I assign multiple Forms to a single asset?** - -You sure can! Please keep in mind that an Asset will only be considered Documented or Verified if all required questions are completed on all assiged Forms. - -### API Tutorials - -- [API Guides on Documentation Form](../../../docs/api/tutorials/forms.md) - -:::note -You must create a Structured Property before including it in a Documentation Form. -To learn more about creating Structured Properties via CLI, please see the [Create Structured Properties](/docs/api/tutorials/structured-properties.md) tutorial. -::: - -### Related Features - -- [DataHub Properties](/docs/features/feature-guides/properties.md) \ No newline at end of file diff --git a/docs/features/feature-guides/properties.md b/docs/features/feature-guides/properties.md index 0d961b9ceac4ff..abdb736ad2a429 100644 --- a/docs/features/feature-guides/properties.md +++ b/docs/features/feature-guides/properties.md @@ -155,4 +155,4 @@ Please see the following API guides related to Custom and Structured Properties: ### Related Features -- [Documentation Forms](/docs/features/feature-guides/documentation-forms.md) \ No newline at end of file +- [Compliance Forms](compliance-forms/overview.md) \ No newline at end of file diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index 077e0e2b666be1..9c608187342e8c 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -51,6 +51,7 @@ public class Constants { // App sources public static final String UI_SOURCE = "ui"; public static final String SYSTEM_UPDATE_SOURCE = "systemUpdate"; + public static final String METADATA_TESTS_SOURCE = "metadataTests"; /** Entities */ public static final String CORP_USER_ENTITY_NAME = "corpuser"; diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py index 397606400d389c..2239338972d9be 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py @@ -43,6 +43,7 @@ "EXTERNAL_BROWSER_AUTHENTICATOR": EXTERNAL_BROWSER_AUTHENTICATOR, "KEY_PAIR_AUTHENTICATOR": KEY_PAIR_AUTHENTICATOR, "OAUTH_AUTHENTICATOR": OAUTH_AUTHENTICATOR, + "OAUTH_AUTHENTICATOR_TOKEN": OAUTH_AUTHENTICATOR, } _SNOWFLAKE_HOST_SUFFIX = ".snowflakecomputing.com" @@ -104,6 +105,10 @@ class SnowflakeConnectionConfig(ConfigModel): description="Connect args to pass to Snowflake SqlAlchemy driver", exclude=True, ) + token: Optional[str] = pydantic.Field( + default=None, + description="OAuth token from external identity provider. Not recommended for most use cases because it will not be able to refresh once expired.", + ) def get_account(self) -> str: assert self.account_id @@ -148,6 +153,18 @@ def authenticator_type_is_valid(cls, v, values): logger.info(f"using authenticator type '{v}'") return v + @pydantic.validator("token", always=True) + def validate_token_oauth_config(cls, v, values): + auth_type = values.get("authentication_type") + if auth_type == "OAUTH_AUTHENTICATOR_TOKEN": + if not v: + raise ValueError("Token required for OAUTH_AUTHENTICATOR_TOKEN.") + elif v is not None: + raise ValueError( + "Token can only be provided when using OAUTH_AUTHENTICATOR_TOKEN" + ) + return v + @staticmethod def _check_oauth_config(oauth_config: Optional[OAuthConfiguration]) -> None: if oauth_config is None: @@ -333,6 +350,17 @@ def get_native_connection(self) -> NativeSnowflakeConnection: application=_APPLICATION_NAME, **connect_args, ) + elif self.authentication_type == "OAUTH_AUTHENTICATOR_TOKEN": + return snowflake.connector.connect( + user=self.username, + account=self.account_id, + authenticator="oauth", + token=self.token, # Token generated externally and provided directly to the recipe + warehouse=self.warehouse, + role=self.role, + application=_APPLICATION_NAME, + **connect_args, + ) elif self.authentication_type == "OAUTH_AUTHENTICATOR": return self.get_oauth_connection() elif self.authentication_type == "KEY_PAIR_AUTHENTICATOR": diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 41ffcb95a7cc43..64aa8cfc6ef6c7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1197,6 +1197,8 @@ def _run_sql_parser( ) else: self.report.num_view_definitions_parsed += 1 + if raw_lineage.out_tables != [view_urn]: + self.report.num_view_definitions_view_urn_mismatch += 1 return view_definition_lineage_helper(raw_lineage, view_urn) def get_db_schema(self, dataset_identifier: str) -> Tuple[Optional[str], str]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py index c1f722b5d1e783..c445ce44a91449 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_report.py @@ -48,6 +48,7 @@ class SQLSourceReport( query_combiner: Optional[SQLAlchemyQueryCombinerReport] = None num_view_definitions_parsed: int = 0 + num_view_definitions_view_urn_mismatch: int = 0 num_view_definitions_failed_parsing: int = 0 num_view_definitions_failed_column_parsing: int = 0 view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 9a6cde78cf10d3..f758746193cd83 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -974,6 +974,8 @@ def _run_sql_parser( ) else: self.report.num_view_definitions_parsed += 1 + if raw_lineage.out_tables != [view_urn]: + self.report.num_view_definitions_view_urn_mismatch += 1 return view_definition_lineage_helper(raw_lineage, view_urn) def get_view_lineage(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 9adb792a4be518..4ff68574bf20e6 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1243,13 +1243,19 @@ def infer_output_schema(result: SqlParsingResult) -> Optional[List[SchemaFieldCl def view_definition_lineage_helper( result: SqlParsingResult, view_urn: str ) -> SqlParsingResult: - if result.query_type is QueryType.SELECT: + if result.query_type is QueryType.SELECT or ( + result.out_tables and result.out_tables != [view_urn] + ): # Some platforms (e.g. postgres) store only ` . For such view definitions, `result.out_tables` and # `result.column_lineage[].downstream` are empty in `sqlglot_lineage` response, whereas upstream # details and downstream column details are extracted correctly. # Here, we inject view V's urn in `result.out_tables` and `result.column_lineage[].downstream` # to get complete lineage result. + + # Some platforms(e.g. mssql) may have slightly different view name in view definition than + # actual view name used elsewhere. Therefore we overwrite downstream table for such cases as well. + result.out_tables = [view_urn] if result.column_lineage: for col_result in result.column_lineage: diff --git a/metadata-ingestion/tests/integration/oracle/golden_test_ingest_with_database.json b/metadata-ingestion/tests/integration/oracle/golden_test_ingest_with_database.json index cbcadde6feb213..abd9b2350638a2 100644 --- a/metadata-ingestion/tests/integration/oracle/golden_test_ingest_with_database.json +++ b/metadata-ingestion/tests/integration/oracle/golden_test_ingest_with_database.json @@ -17,7 +17,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -33,7 +33,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -49,7 +49,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -67,7 +67,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -83,7 +83,23 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:937a38ee28b69ecae38665c5e842d0ad", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:0e497517e191d344b0c403231bc708d0" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -106,7 +122,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -122,7 +138,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -138,7 +154,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -156,23 +172,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "container", - "entityUrn": "urn:li:container:937a38ee28b69ecae38665c5e842d0ad", - "changeType": "UPSERT", - "aspectName": "container", - "aspect": { - "json": { - "container": "urn:li:container:0e497517e191d344b0c403231bc708d0" - } - }, - "systemMetadata": { - "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -193,7 +193,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -209,7 +209,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -272,7 +272,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -290,7 +290,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -315,7 +315,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -331,7 +331,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -394,7 +394,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -412,7 +412,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -437,7 +437,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -453,7 +453,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -519,7 +519,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -537,7 +537,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -555,7 +555,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -580,7 +580,23 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:1965527855ae77f259a8ddea2b8eed2f", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:0e497517e191d344b0c403231bc708d0" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -603,7 +619,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -619,7 +635,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -635,7 +651,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -653,23 +669,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "container", - "entityUrn": "urn:li:container:1965527855ae77f259a8ddea2b8eed2f", - "changeType": "UPSERT", - "aspectName": "container", - "aspect": { - "json": { - "container": "urn:li:container:0e497517e191d344b0c403231bc708d0" - } - }, - "systemMetadata": { - "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -690,7 +690,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -706,7 +706,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -769,7 +769,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -787,7 +787,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -812,7 +812,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -828,7 +828,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -891,7 +891,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -909,7 +909,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -934,7 +934,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -950,7 +950,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -1016,7 +1016,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -1034,7 +1034,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -1052,7 +1052,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, @@ -1077,13 +1077,13 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema1.mock_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:oracle,OraDoc.schema1.view1,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -1106,7 +1106,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema1.mock_view,PROD),MOCK_COLUMN1)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,OraDoc.schema1.view1,PROD),MOCK_COLUMN1)" ], "confidenceScore": 1.0 }, @@ -1117,7 +1117,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema1.mock_view,PROD),MOCK_COLUMN2)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,OraDoc.schema1.view1,PROD),MOCK_COLUMN2)" ], "confidenceScore": 1.0 } @@ -1126,13 +1126,13 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema2.mock_view,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:oracle,OraDoc.schema2.view1,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -1155,7 +1155,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema2.mock_view,PROD),MOCK_COLUMN1)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,OraDoc.schema2.view1,PROD),MOCK_COLUMN1)" ], "confidenceScore": 1.0 }, @@ -1166,7 +1166,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema2.mock_view,PROD),MOCK_COLUMN2)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:oracle,OraDoc.schema2.view1,PROD),MOCK_COLUMN2)" ], "confidenceScore": 1.0 } @@ -1175,39 +1175,7 @@ }, "systemMetadata": { "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema1.mock_view,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:oracle,oradoc.schema2.mock_view,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1643871600000, - "runId": "oracle-2022_02_03-07_00_00", + "runId": "oracle-2022_02_03-07_00_00-uzcdxn", "lastRunId": "no-run-id-provided" } } diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json index 54821347fd28b8..72dcda25c1296c 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json @@ -113,11 +113,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "4130c37d-146c-43da-a671-dd9a413a44b3", + "job_id": "c2d77890-83ba-435f-879b-1c77fa38dd47", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-11-22 12:58:03.260000", - "date_modified": "2024-11-22 12:58:03.440000", + "date_created": "2024-12-05 16:44:43.910000", + "date_modified": "2024-12-05 16:44:44.043000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -2282,8 +2282,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-11-22 12:58:03.137000", - "date_modified": "2024-11-22 12:58:03.137000" + "date_created": "2024-12-05 16:44:43.800000", + "date_modified": "2024-12-05 16:44:43.800000" }, "externalUrl": "", "name": "DemoData.Foo.Proc.With.SpecialChar", @@ -2310,8 +2310,8 @@ "depending_on_procedure": "{}", "code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n", "input parameters": "[]", - "date_created": "2024-11-22 12:58:03.140000", - "date_modified": "2024-11-22 12:58:03.140000" + "date_created": "2024-12-05 16:44:43.803000", + "date_modified": "2024-12-05 16:44:43.803000" }, "externalUrl": "", "name": "DemoData.Foo.NewProc", @@ -4883,7 +4883,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,demodata.foo.personsview,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.PersonsView,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -4908,7 +4908,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,newdata.foonew.view1,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,NewData.FooNew.View1,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -4931,7 +4931,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,newdata.foonew.view1,PROD),firstname)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,NewData.FooNew.View1,PROD),firstname)" ], "confidenceScore": 1.0 }, @@ -4942,7 +4942,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,newdata.foonew.view1,PROD),lastname)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,NewData.FooNew.View1,PROD),lastname)" ], "confidenceScore": 1.0 } @@ -5034,37 +5034,5 @@ "runId": "mssql-test", "lastRunId": "no-run-id-provided" } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,demodata.foo.personsview,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "mssql-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,newdata.foonew.view1,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "mssql-test", - "lastRunId": "no-run-id-provided" - } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json index 1d702214fedf79..0df89ff1eb94d7 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json @@ -113,11 +113,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "2a055367-5e6a-4162-b3a9-dd60f52c79a8", + "job_id": "c2d77890-83ba-435f-879b-1c77fa38dd47", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-11-26 07:22:19.640000", - "date_modified": "2024-11-26 07:22:19.773000", + "date_created": "2024-12-05 16:44:43.910000", + "date_modified": "2024-12-05 16:44:44.043000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -2282,8 +2282,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-11-26 07:22:19.510000", - "date_modified": "2024-11-26 07:22:19.510000" + "date_created": "2024-12-05 16:44:43.800000", + "date_modified": "2024-12-05 16:44:43.800000" }, "externalUrl": "", "name": "DemoData.Foo.Proc.With.SpecialChar", @@ -2630,7 +2630,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,demodata.foo.personsview,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.PersonsView,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -2716,21 +2716,5 @@ "runId": "mssql-test", "lastRunId": "no-run-id-provided" } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,demodata.foo.personsview,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "mssql-test", - "lastRunId": "no-run-id-provided" - } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json index 3836e587ef8cf4..b67ebfb206883a 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json @@ -113,11 +113,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "4130c37d-146c-43da-a671-dd9a413a44b3", + "job_id": "c2d77890-83ba-435f-879b-1c77fa38dd47", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-11-22 12:58:03.260000", - "date_modified": "2024-11-22 12:58:03.440000", + "date_created": "2024-12-05 16:44:43.910000", + "date_modified": "2024-12-05 16:44:44.043000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -2282,8 +2282,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-11-22 12:58:03.137000", - "date_modified": "2024-11-22 12:58:03.137000" + "date_created": "2024-12-05 16:44:43.800000", + "date_modified": "2024-12-05 16:44:43.800000" }, "externalUrl": "", "name": "DemoData.Foo.Proc.With.SpecialChar", @@ -2310,8 +2310,8 @@ "depending_on_procedure": "{}", "code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n", "input parameters": "[]", - "date_created": "2024-11-22 12:58:03.140000", - "date_modified": "2024-11-22 12:58:03.140000" + "date_created": "2024-12-05 16:44:43.803000", + "date_modified": "2024-12-05 16:44:43.803000" }, "externalUrl": "", "name": "DemoData.Foo.NewProc", @@ -2658,7 +2658,7 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,demodata.foo.personsview,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.PersonsView,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -2760,21 +2760,5 @@ "runId": "mssql-test", "lastRunId": "no-run-id-provided" } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mssql,demodata.foo.personsview,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "mssql-test", - "lastRunId": "no-run-id-provided" - } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py index 161dfa2b4e78f3..3284baf103e5af 100644 --- a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py @@ -130,6 +130,60 @@ def test_snowflake_oauth_happy_paths(): ) +def test_snowflake_oauth_token_happy_path(): + assert SnowflakeV2Config.parse_obj( + { + "account_id": "test", + "authentication_type": "OAUTH_AUTHENTICATOR_TOKEN", + "token": "valid-token", + "username": "test-user", + "oauth_config": None, + } + ) + + +def test_snowflake_oauth_token_without_token(): + with pytest.raises( + ValidationError, match="Token required for OAUTH_AUTHENTICATOR_TOKEN." + ): + SnowflakeV2Config.parse_obj( + { + "account_id": "test", + "authentication_type": "OAUTH_AUTHENTICATOR_TOKEN", + "username": "test-user", + } + ) + + +def test_snowflake_oauth_token_with_wrong_auth_type(): + with pytest.raises( + ValueError, + match="Token can only be provided when using OAUTH_AUTHENTICATOR_TOKEN.", + ): + SnowflakeV2Config.parse_obj( + { + "account_id": "test", + "authentication_type": "OAUTH_AUTHENTICATOR", + "token": "some-token", + "username": "test-user", + } + ) + + +def test_snowflake_oauth_token_with_empty_token(): + with pytest.raises( + ValidationError, match="Token required for OAUTH_AUTHENTICATOR_TOKEN." + ): + SnowflakeV2Config.parse_obj( + { + "account_id": "test", + "authentication_type": "OAUTH_AUTHENTICATOR_TOKEN", + "token": "", + "username": "test-user", + } + ) + + default_config_dict: Dict[str, Any] = { "username": "user", "password": "password", diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index a0a55cf505cf35..bf3481205fb5ab 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -855,6 +855,7 @@ private List ingestAspectsToLocalDB( if (inputBatch.containsDuplicateAspects()) { log.warn(String.format("Batch contains duplicates: %s", inputBatch)); + MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc(); } return aspectDao @@ -928,6 +929,7 @@ private List ingestAspectsToLocalDB( // No changes, return if (changeMCPs.isEmpty()) { + MetricUtils.counter(EntityServiceImpl.class, "batch_empty").inc(); return Collections.emptyList(); } @@ -935,6 +937,7 @@ private List ingestAspectsToLocalDB( ValidationExceptionCollection exceptions = AspectsBatch.validatePreCommit(changeMCPs, opContext.getRetrieverContext().get()); if (!exceptions.isEmpty()) { + MetricUtils.counter(EntityServiceImpl.class, "batch_validation_exception").inc(); throw new ValidationException(collectMetrics(exceptions).toString()); } @@ -972,10 +975,13 @@ This condition is specifically for an older conditional write ingestAspectIfNotP */ if (overwrite || databaseAspect == null) { result = - ingestAspectToLocalDB(txContext, writeItem, databaseSystemAspect) - .toBuilder() - .request(writeItem) - .build(); + Optional.ofNullable( + ingestAspectToLocalDB( + txContext, writeItem, databaseSystemAspect)) + .map( + optResult -> + optResult.toBuilder().request(writeItem).build()) + .orElse(null); } else { RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate(); @@ -996,49 +1002,56 @@ This condition is specifically for an older conditional write ingestAspectIfNotP return result; }) + .filter(Objects::nonNull) .collect(Collectors.toList()); - // commit upserts prior to retention or kafka send, if supported by impl - if (txContext != null) { - txContext.commitAndContinue(); - } - long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop()); - if (took > DB_TIMER_LOG_THRESHOLD_MS) { - log.info("Ingestion of aspects batch to database took {} ms", took); - } + if (!upsertResults.isEmpty()) { + // commit upserts prior to retention or kafka send, if supported by impl + if (txContext != null) { + txContext.commitAndContinue(); + } + long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop()); + if (took > DB_TIMER_LOG_THRESHOLD_MS) { + log.info("Ingestion of aspects batch to database took {} ms", took); + } - // Retention optimization and tx - if (retentionService != null) { - List retentionBatch = - upsertResults.stream() - // Only consider retention when there was a previous version - .filter( - result -> - batchAspects.containsKey(result.getUrn().toString()) - && batchAspects - .get(result.getUrn().toString()) - .containsKey(result.getRequest().getAspectName())) - .filter( - result -> { - RecordTemplate oldAspect = result.getOldValue(); - RecordTemplate newAspect = result.getNewValue(); - // Apply retention policies if there was an update to existing aspect - // value - return oldAspect != newAspect - && oldAspect != null - && retentionService != null; - }) - .map( - result -> - RetentionService.RetentionContext.builder() - .urn(result.getUrn()) - .aspectName(result.getRequest().getAspectName()) - .maxVersion(Optional.of(result.getMaxVersion())) - .build()) - .collect(Collectors.toList()); - retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch); + // Retention optimization and tx + if (retentionService != null) { + List retentionBatch = + upsertResults.stream() + // Only consider retention when there was a previous version + .filter( + result -> + batchAspects.containsKey(result.getUrn().toString()) + && batchAspects + .get(result.getUrn().toString()) + .containsKey(result.getRequest().getAspectName())) + .filter( + result -> { + RecordTemplate oldAspect = result.getOldValue(); + RecordTemplate newAspect = result.getNewValue(); + // Apply retention policies if there was an update to existing + // aspect + // value + return oldAspect != newAspect + && oldAspect != null + && retentionService != null; + }) + .map( + result -> + RetentionService.RetentionContext.builder() + .urn(result.getUrn()) + .aspectName(result.getRequest().getAspectName()) + .maxVersion(Optional.of(result.getMaxVersion())) + .build()) + .collect(Collectors.toList()); + retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch); + } else { + log.warn("Retention service is missing!"); + } } else { - log.warn("Retention service is missing!"); + MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc(); + log.warn("Empty transaction detected. {}", inputBatch); } return upsertResults; @@ -2506,7 +2519,7 @@ private Map getEnvelopedAspects( * @param databaseAspect The aspect as it exists in the database. * @return result object */ - @Nonnull + @Nullable private UpdateAspectResult ingestAspectToLocalDB( @Nullable TransactionContext txContext, @Nonnull final ChangeMCP writeItem, @@ -2520,6 +2533,9 @@ private UpdateAspectResult ingestAspectToLocalDB( .setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL); // 2. Compare the latest existing and new. + final RecordTemplate databaseValue = + databaseAspect == null ? null : databaseAspect.getRecordTemplate(); + final EntityAspect.EntitySystemAspect previousBatchAspect = (EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect(); final RecordTemplate previousValue = @@ -2528,7 +2544,7 @@ private UpdateAspectResult ingestAspectToLocalDB( // 3. If there is no difference between existing and new, we just update // the lastObserved in system metadata. RunId should stay as the original runId if (previousValue != null - && DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) { + && DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) { SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata(); latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved()); @@ -2564,45 +2580,49 @@ private UpdateAspectResult ingestAspectToLocalDB( } // 4. Save the newValue as the latest version - log.debug( - "Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn()); - String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate()); - long versionOfOld = - aspectDao.saveLatestAspect( - txContext, - writeItem.getUrn().toString(), - writeItem.getAspectName(), - previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue), - previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(), - previousBatchAspect == null - ? null - : previousBatchAspect.getEntityAspect().getCreatedFor(), - previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(), - previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(), - newValueStr, - writeItem.getAuditStamp().getActor().toString(), - writeItem.getAuditStamp().hasImpersonator() - ? writeItem.getAuditStamp().getImpersonator().toString() - : null, - new Timestamp(writeItem.getAuditStamp().getTime()), - EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()), - writeItem.getNextAspectVersion()); - - // metrics - aspectDao.incrementWriteMetrics( - writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length); - - return UpdateAspectResult.builder() - .urn(writeItem.getUrn()) - .oldValue(previousValue) - .newValue(writeItem.getRecordTemplate()) - .oldSystemMetadata( - previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata()) - .newSystemMetadata(writeItem.getSystemMetadata()) - .operation(MetadataAuditOperation.UPDATE) - .auditStamp(writeItem.getAuditStamp()) - .maxVersion(versionOfOld) - .build(); + if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) { + log.debug( + "Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn()); + String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate()); + long versionOfOld = + aspectDao.saveLatestAspect( + txContext, + writeItem.getUrn().toString(), + writeItem.getAspectName(), + previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue), + previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(), + previousBatchAspect == null + ? null + : previousBatchAspect.getEntityAspect().getCreatedFor(), + previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(), + previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(), + newValueStr, + writeItem.getAuditStamp().getActor().toString(), + writeItem.getAuditStamp().hasImpersonator() + ? writeItem.getAuditStamp().getImpersonator().toString() + : null, + new Timestamp(writeItem.getAuditStamp().getTime()), + EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()), + writeItem.getNextAspectVersion()); + + // metrics + aspectDao.incrementWriteMetrics( + writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length); + + return UpdateAspectResult.builder() + .urn(writeItem.getUrn()) + .oldValue(previousValue) + .newValue(writeItem.getRecordTemplate()) + .oldSystemMetadata( + previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata()) + .newSystemMetadata(writeItem.getSystemMetadata()) + .operation(MetadataAuditOperation.UPDATE) + .auditStamp(writeItem.getAuditStamp()) + .maxVersion(versionOfOld) + .build(); + } + + return null; } private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index f2ed2fddba7654..a1000fd02abfe1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -1,8 +1,10 @@ package com.linkedin.metadata.entity; +import static com.linkedin.metadata.Constants.APP_SOURCE; import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME; import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME; +import static com.linkedin.metadata.Constants.METADATA_TESTS_SOURCE; import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; @@ -19,6 +21,7 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringMap; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.identity.CorpUserInfo; import com.linkedin.metadata.AspectGenerationUtils; @@ -61,6 +64,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Triple; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -534,8 +538,8 @@ public void testBatchPatchWithTrailingNoOp() throws Exception { opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); assertEquals( envelopedAspect.getSystemMetadata().getVersion(), - "2", - "Expected version 2. 1 - Initial, + 1 batch operation (1 add, 1 remove)"); + "3", + "Expected version 3. 1 - Initial, + 1 add, 1 remove"); assertEquals( new GlobalTags(envelopedAspect.getValue().data()) .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), @@ -649,7 +653,7 @@ public void testBatchPatchAdd() throws Exception { EnvelopedAspect envelopedAspect = _entityServiceImpl.getLatestEnvelopedAspect( opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); - assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3"); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "4", "Expected version 4"); assertEquals( new GlobalTags(envelopedAspect.getValue().data()) .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), @@ -657,6 +661,95 @@ public void testBatchPatchAdd() throws Exception { "Expected all tags"); } + @Test + public void testBatchPatchAddDuplicate() throws Exception { + Urn entityUrn = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:snowflake,testBatchPatchAdd,PROD)"); + List initialTags = + List.of( + TagUrn.createFromString("urn:li:tag:__default_large_table"), + TagUrn.createFromString("urn:li:tag:__default_low_queries"), + TagUrn.createFromString("urn:li:tag:__default_low_changes"), + TagUrn.createFromString("urn:li:tag:!10TB+ tables")) + .stream() + .map(tag -> new TagAssociation().setTag(tag)) + .collect(Collectors.toList()); + TagUrn tag2 = TagUrn.createFromString("urn:li:tag:$ 1TB+"); + + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + + SystemMetadata patchSystemMetadata = new SystemMetadata(); + patchSystemMetadata.setLastObserved(systemMetadata.getLastObserved() + 1); + patchSystemMetadata.setProperties(new StringMap(Map.of(APP_SOURCE, METADATA_TESTS_SOURCE))); + + ChangeItemImpl initialAspectTag1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .recordTemplate(new GlobalTags().setTags(new TagAssociationArray(initialTags))) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + + PatchItemImpl patchAdd2 = + PatchItemImpl.builder() + .urn(entityUrn) + .entitySpec(_testEntityRegistry.getEntitySpec(DATASET_ENTITY_NAME)) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .aspectSpec( + _testEntityRegistry + .getEntitySpec(DATASET_ENTITY_NAME) + .getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder() + .arrayPrimaryKeys(Map.of("properties", List.of("tag"))) + .patch(List.of(tagPatchOp(PatchOperationType.ADD, tag2))) + .build() + .getJsonPatch()) + .systemMetadata(patchSystemMetadata) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(_testEntityRegistry); + + // establish base entity + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(initialAspectTag1)) + .build(), + false, + true); + + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(patchAdd2, patchAdd2)) // duplicate + .build(), + false, + true); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 1); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME); + assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3"); + assertEquals( + new GlobalTags(envelopedAspect.getValue().data()) + .getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()), + Stream.concat(initialTags.stream().map(TagAssociation::getTag), Stream.of(tag2)) + .collect(Collectors.toSet()), + "Expected all tags"); + } + @Test public void dataGeneratorThreadingTest() { DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl);