Skip to content

Commit

Permalink
feat(propagation): add new capabilities and guardrails
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Sep 29, 2024
1 parent e255961 commit 3a2d62f
Show file tree
Hide file tree
Showing 13 changed files with 1,155 additions and 221 deletions.
25 changes: 14 additions & 11 deletions .github/workflows/datahub-actions-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,17 @@ jobs:
env:
ENABLE_PUBLISH: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
run: |
echo "Enable publish: ${{ env.ENABLE_PUBLISH != '' }}"
echo "publish=${{ env.ENABLE_PUBLISH != '' }}" >> "$GITHUB_OUTPUT"
if [[ -n "$ENABLE_PUBLISH" && "${{ github.repository }}" == "acryldata/datahub-actions" ]]; then
echo "Publishing is enabled"
echo "publish=true" >> "$GITHUB_OUTPUT"
else
echo "Publishing is not enabled"
echo "publish=false" >> "$GITHUB_OUTPUT"
fi
echo "Enable publish: ${{ env.ENABLE_PUBLISH != '' && github.repository == 'acryldata/datahub-actions' }}"
regular_image:
name: Build & Push Image to DockerHub
if: ${{ needs.setup.outputs.publish == 'true' }} # Only build the regular image if publishing is enabled
runs-on: ubuntu-latest
needs: setup
steps:
Expand Down Expand Up @@ -180,11 +187,9 @@ jobs:
- name: Load Docker image (if not publishing)
if: needs.setup.outputs.publish != 'true'
run: docker load < image.tar
- name: Download image (if publishing)
- name: Pull Docker image (if publishing)
if: needs.setup.outputs.publish == 'true'
uses: ishworkh/docker-image-artifact-download@v1
with:
image: acryldata/datahub-actions-slim:${{ needs.setup.outputs.unique_tag }}
run: docker pull acryldata/datahub-actions-slim:${{ needs.setup.outputs.unique_tag }}
- name: Run Trivy vulnerability scanner (slim)
uses: aquasecurity/trivy-action@master
env:
Expand All @@ -198,7 +203,7 @@ jobs:
ignore-unfixed: true
vuln-type: 'os,library'
- name: Upload Trivy scan results to GitHub Security tab (slim)
uses: github/codeql-action/upload-sarif@v2
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
smoke_test:
Expand Down Expand Up @@ -233,11 +238,9 @@ jobs:
- name: Load Docker image (if not publishing)
if: needs.setup.outputs.publish != 'true'
run: docker load < image.tar
- name: Download image (if publishing)
- name: Pull Docker image (if publishing)
if: needs.setup.outputs.publish == 'true'
uses: ishworkh/docker-image-artifact-download@v1
with:
image: acryldata/datahub-actions-slim:${{ needs.setup.outputs.unique_tag }}
run: docker pull acryldata/datahub-actions-slim:${{ needs.setup.outputs.unique_tag }}
- name: run quickstart
env:
DATAHUB_TELEMETRY_ENABLED: false
Expand Down
62 changes: 44 additions & 18 deletions datahub-actions/src/datahub_actions/api/action_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,51 @@ def query_ingestion_sources(self) -> List:
break
return sources

def get_downstreams(self, entity_urn: str) -> List[str]:
url_frag = f"/relationships?direction=INCOMING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}"
url = f"{self.graph._gms_server}{url_frag}"
response = self.graph._get_generic(url)
if response["count"] > 0:
relnships = response["relationships"]
entities = [x["entity"] for x in relnships]
return entities
return []
def get_downstreams(
self, entity_urn: str, max_downstreams: int = 3000
) -> List[str]:
start = 0
count_per_page = 1000
entities = []
done = False
total_downstreams = 0
while not done:
# if start > 0:
# breakpoint()
url_frag = f"/relationships?direction=INCOMING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}&count={count_per_page}&start={start}"
url = f"{self.graph._gms_server}{url_frag}"
response = self.graph._get_generic(url)
if response["count"] > 0:
relnships = response["relationships"]
entities.extend([x["entity"] for x in relnships])
start += count_per_page
total_downstreams += response["count"]
if start >= response["total"] or total_downstreams >= max_downstreams:
done = True
else:
done = True
return entities

def get_upstreams(self, entity_urn: str) -> List[str]:
url_frag = f"/relationships?direction=OUTGOING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}"
url = f"{self.graph._gms_server}{url_frag}"
response = self.graph._get_generic(url)
if response["count"] > 0:
relnships = response["relationships"]
entities = [x["entity"] for x in relnships]
return entities
return []
def get_upstreams(self, entity_urn: str, max_upstreams: int = 3000) -> List[str]:
start = 0
count_per_page = 100
entities = []
done = False
total_upstreams = 0
while not done:
url_frag = f"/relationships?direction=OUTGOING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}&count={count_per_page}&start={start}"
url = f"{self.graph._gms_server}{url_frag}"
response = self.graph._get_generic(url)
if response["count"] > 0:
relnships = response["relationships"]
entities.extend([x["entity"] for x in relnships])
start += count_per_page
total_upstreams += response["count"]
if start >= response["total"] or total_upstreams >= max_upstreams:
done = True
else:
done = True
return entities

def get_relationships(
self, entity_urn: str, direction: str, relationship_types: List[str]
Expand Down
Loading

0 comments on commit 3a2d62f

Please sign in to comment.