Skip to content

Commit

Permalink
Merge pull request #6 from wilkysingh-tarento/ApacheFLinkForRatingFinal
Browse files Browse the repository at this point in the history
Apache flink for rating final
  • Loading branch information
wilkysingh-tarento authored Apr 1, 2022
2 parents 3648990 + c4f886a commit 67bba44
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
include "base-config.conf"

kafka {
input.topic = "rating"
output.topic = "failedRating"
#input.topic = "rating"
#output.topic = "failedRating"

output.topic = ${job.env}."rating.failedRating"
input.topic = ${job.env}."dev.rating.event"
groupId = ${job.env}"-rating-group"

}

task {
Expand Down
6 changes: 6 additions & 0 deletions data-pipeline-flink/sunbird-dp-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<version>1.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.sunbird.dp.jobs</groupId>
<artifactId>rating</artifactId>
<version>1.0.0</version>
<type>jar</type>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 19 additions & 0 deletions kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ summary_denorm_operators_parallelism: 1
summary_denorm_duplication_key_expiry_seconds: 3600
summary_denorm_key_expiry_seconds: 3600

### rating related vars
rating_consumer_parallelism: 1
rating_operators_parallelism: 1
rating_key_expiry_seconds: 3600


### Druid-validator related vars
druid_validator_consumer_parallelism: 1
druid_validator_operators_parallelism: 1
Expand Down Expand Up @@ -198,6 +204,19 @@ flink_job_names:
scale_target_value: 500000
min_replica: 1
max_replica: 2
rating:
job_class_name: 'org.sunbird.dp.rating.task.RatingTask'
replica: 1
jobmanager_memory: 1024m
taskmanager_memory: 1024m
taskmanager_process_memory: 1700m
jobmanager_process_memory: 1600m
taskslots: 1
cpu_requests: 0.3
scale_enabled: true
scale_target_value: 500000
min_replica: 1
max_replica: 2
de-normalization:
job_class_name: 'org.sunbird.dp.denorm.task.DenormalizationStreamTask'
replica: 1
Expand Down
59 changes: 59 additions & 0 deletions kubernetes/helm_charts/datapipeline_jobs/values.j2
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ scale_properties:
scale_target_value: {{ flink_job_names['device-profile-updater'].scale_target_value | default(0) }}
min_replica: {{ flink_job_names['device-profile-updater'].min_replica | default(1) }}
max_replica: {{ flink_job_names['device-profile-updater'].max_replica | default(2) }}
rating:
enabled: {{ flink_job_names['rating'].scale_enabled | lower}}
scale_target_value: {{ flink_job_names['rating'].scale_target_value | default(0) }}
min_replica: {{ flink_job_names['rating'].min_replica | default(1) }}
max_replica: {{ flink_job_names['rating'].max_replica | default(2) }}
ingest-router:
enabled: {{ flink_job_names['ingest-router'].scale_enabled | lower}}
scale_target_value: {{ flink_job_names['ingest-router'].scale_target_value }}
Expand Down Expand Up @@ -551,6 +556,60 @@ summary-denormalization:
taskmanager.memory.process.size: {{ flink_job_names['summary-denormalization'].taskmanager_process_memory }}
jobmanager.memory.process.size: {{ flink_job_names['summary-denormalization'].jobmanager_process_memory }}

rating:
rating: |+
include file("/data/flink/conf/base-config.conf")
kafka {
input.topic = {{ env_name }}.rating.event
output.topic = {{ env_name }}.rating.failedRating
groupId = {{ env_name }}-rating-group
}
skip.events = ["INTERRUPT"]
permit.eid=["AUDIT"]
task {
consumer.parallelism = {{ rating_consumer_parallelism }}
downstream.operators.parallelism = {{ rating_operators_parallelism }}
}
redis {
database {
duplicationstore.id = 9
key.expiry.seconds = {{ rating_key_expiry_seconds }}
}
}
# redis-metadata
redis-meta {
database {
devicestore.id = 2
userstore.id = 12
contentstore.id = 5
dialcodestore.id = 6
key.expiry.seconds = {{ rating_key_expiry_seconds }}
}
content.host = {{ redis_meta_content_host }}
device.host = {{ redis_meta_device_host }}
user.host = {{ redis_meta_user_host }}
dialcode.host = {{ redis_meta_dialcode_host }}
content.port = {{ redis_meta_content_port }}
device.port = {{ redis_meta_device_port }}
user.port = {{ redis_meta_user_port }}
dialcode.port = {{ redis_meta_dialcode_port }}
}
# config version
user_denorm_version = v2

flink-conf: |+
jobmanager.memory.flink.size: {{ flink_job_names['rating'].jobmanager_memory }}
taskmanager.memory.flink.size: {{ flink_job_names['rating'].taskmanager_memory }}
taskmanager.numberOfTaskSlots: {{ flink_job_names['rating'].taskslots }}
parallelism.default: 1
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1
scheduler-mode: reactive
heartbeat.timeout: 8000
heartbeat.interval: 5000
taskmanager.memory.process.size: {{ flink_job_names['rating'].taskmanager_process_memory }}
jobmanager.memory.process.size: {{ flink_job_names['rating'].jobmanager_process_memory }}

druid-validator:
druid-validator: |+
include file("/data/flink/conf/base-config.conf")
Expand Down

0 comments on commit 67bba44

Please sign in to comment.