Skip to content

Commit

Permalink
4.8.19 event cert re issue (#51)
Browse files Browse the repository at this point in the history
* 4.8.20 dev v1 (#48)

* 4.8.18 dev hot fix (#41)

* Added Local Cache for Storing Enrollment List Batches

* Add log to confirm batch retrieval from cache

* removed unused class (#42)

---------

Co-authored-by: saipradeep_ravipati <[email protected]>
Co-authored-by: shankaragoudab <[email protected]>

* added code for reissue certificates for events

* changed kafka topic

---------

Co-authored-by: Karthikeyan Rajendran <[email protected]>
Co-authored-by: saipradeep_ravipati <[email protected]>
Co-authored-by: shankaragoudab <[email protected]>
Co-authored-by: karthik-tarento <[email protected]>
Co-authored-by: anilkumar <[email protected]>

* added action type for event (#49)

Co-authored-by: anilkumar <[email protected]>

* update kafka msg for event (#50)

Co-authored-by: anilkumar <[email protected]>

---------

Co-authored-by: anilkumarkammalapalli <[email protected]>
Co-authored-by: saipradeep_ravipati <[email protected]>
Co-authored-by: shankaragoudab <[email protected]>
Co-authored-by: anilkumar <[email protected]>
  • Loading branch information
5 people authored Dec 9, 2024
1 parent 12a1e83 commit 552380b
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
public enum CourseActorOperations {
ISSUE_CERTIFICATE("issueCertificate"),
ADD_BATCH_CERTIFICATE("addCertificateToCourseBatch"),
DELETE_BATCH_CERTIFICATE("removeCertificateFromCourseBatch");
DELETE_BATCH_CERTIFICATE("removeCertificateFromCourseBatch"),
ISSUE_EVENT_CERTIFICATE("issueEventCertificate");

private String value;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.sunbird.learner.actors.certificate.service;

import java.text.MessageFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.commons.collections.CollectionUtils;
import org.sunbird.actor.base.BaseActor;
import org.sunbird.common.exception.ProjectCommonException;
Expand All @@ -17,6 +16,7 @@
import org.sunbird.common.request.Request;
import org.sunbird.common.responsecode.ResponseCode;
import org.sunbird.kafka.client.InstructionEventGenerator;
import org.sunbird.kafka.client.KafkaClient;
import org.sunbird.learner.constants.CourseJsonKey;
import org.sunbird.learner.constants.InstructionEvent;
import org.sunbird.learner.util.CourseBatchUtil;
Expand Down Expand Up @@ -48,6 +48,9 @@ public void onReceive(Request request) throws Throwable {
case "issueCertificate":
issueCertificate(request);
break;
case "issueEventCertificate":
issueEventCertificate(request);
break;
default:
onReceiveUnsupportedOperation(request.getOperation());
break;
Expand Down Expand Up @@ -148,4 +151,94 @@ private void pushInstructionEvent(
String topic = ProjectUtil.getConfigValue("kafka_topics_certificate_instruction");
InstructionEventGenerator.pushInstructionEvent(batchId, topic, data);
}

private void issueEventCertificate(Request request) {
logger.info(request.getRequestContext(), "issueEventCertificate request=" + request.getRequest());
final String batchId = (String) request.getRequest().get(JsonKey.BATCH_ID);
final String eventId = (String) request.getRequest().get(JsonKey.EVENT_ID);
List<String> userIds = (List<String>) request.getRequest().get(JsonKey.USER_IDs);
final boolean reIssue = isReissue(request.getContext().get(CourseJsonKey.REISSUE));
Map<String, Object> courseBatchResponse =
CourseBatchUtil.validateEventBatch(request.getRequestContext(), eventId, batchId);
if (null == courseBatchResponse.get("cert_templates")) {
ProjectCommonException.throwClientErrorException(
ResponseCode.CLIENT_ERROR, "No certificate templates associated with " + batchId);
}
Response response = new Response();
Map<String, Object> resultData = new HashMap<>();
resultData.put(
JsonKey.STATUS, MessageFormat.format(ResponseMessage.SUBMITTED.getValue(), batchId));
resultData.put(JsonKey.BATCH_ID, batchId);
resultData.put(JsonKey.EVENT_ID, eventId);
resultData.put(JsonKey.COLLECTION_ID, eventId);
response.put(JsonKey.RESULT, resultData);
try {
pushCertificateGenerateKafkaTopic(userIds,eventId,batchId,100.0,reIssue);
} catch (Exception e) {
logger.error(request.getRequestContext(), "issueCertificate pushInstructionEvent error for eventId="
+ eventId + ", batchId=" + batchId, e);
resultData.put(
JsonKey.STATUS, MessageFormat.format(ResponseMessage.FAILED.getValue(), batchId));
}
sender().tell(response, self());
}

public void pushCertificateGenerateKafkaTopic(List<String> userIds, String eventId, String batchId, double completionPercentage, boolean reIssue) {
long now = System.currentTimeMillis();

String userIdsJson = userIds.stream()
.map(id -> "\"" + id + "\"")
.collect(Collectors.joining(", ", "[", "]"));

String event = String.format(
"{"
+ "\"actor\":{"
+ " \"id\": \"Issue Certificate Generator\","
+ " \"type\": \"System\""
+ "},"
+ "\"context\":{"
+ " \"pdata\":{"
+ " \"version\": \"1.0\","
+ " \"id\": \"org.sunbird.learning.platform\""
+ " }"
+ "},"
+ "\"edata\": {"
+ " \"action\": \"issue-event-certificate\","
+ " \"batchId\": \"%s\","
+ " \"eventId\": \"%s\","
+ " \"userIds\": %s,"
+ " \"eventCompletionPercentage\": %.2f%s"
+ "},"
+ "\"eid\": \"BE_JOB_REQUEST\","
+ "\"ets\": %d,"
+ "\"mid\": \"EVENT.%s\","
+ "\"object\": {"
+ " \"id\": \"batch_%s\","
+ " \"type\": \"IssueCertificate\""
+ "}"
+ "}",
batchId,
eventId,
userIdsJson,
completionPercentage,
reIssue ? ",\"reIssue\": true" : "",
now,
UUID.randomUUID().toString(),
batchId
);


String topic = ProjectUtil.getConfigValue("user_issue_certificate_for_event");
try {
KafkaClient.send(String.join(",", userIds), event, topic);
} catch (Exception e) {
throw new ProjectCommonException(
"BE_JOB_REQUEST_EXCEPTION",
"Invalid topic id.",
ResponseCode.CLIENT_ERROR.getResponseCode()
);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/** */
package org.sunbird.learner.util;

import org.sunbird.common.models.util.JsonKey;
import org.sunbird.common.models.util.LoggerUtil;
import org.sunbird.common.models.util.PropertiesCache;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class will handle the data cache.
*
* @author Amit Kumar
*/
public class BatchCacheHandler implements Runnable {

private static Map<String, Object> batchMap = new ConcurrentHashMap<>();

private LoggerUtil logger = new LoggerUtil(BatchCacheHandler.class);

@Override
public void run() {
logger.info(null, "BatchCacheHandler:run: Cache refresh started.");
cache(batchMap);
logger.info(null, "BatchCacheHandler:run: Cache refresh completed.");
}

@SuppressWarnings("unchecked")
private void cache(Map<String, Object> map) {
try {
Map contents = ContentUtil.getAllBatches(Integer.parseInt(PropertiesCache.getInstance()
.getProperty(JsonKey.PAGE_SIZE_CONTENT_FETCH)));
batchMap.putAll(contents);
logger.debug(null, "content keyset " + map.keySet());
logger.info(null, " cache size: " + map.size());
} catch (Exception e) {
logger.error(null, "ContentCacheHandler:cache: Exception in retrieving content section " + e.getMessage(), e);
}
}

/** @return the contentCache */
public static Map<String, Object> getBatchMap() {
return batchMap;
}

public static Map<String, Object> getBatch(String id) {
Map<String, Object> obj = (Map<String, Object>) batchMap.get(id);
if(obj != null)
return obj;
else{
batchMap.putAll(ContentUtil.getAllBatches(Arrays.asList(id),Integer.parseInt(PropertiesCache.getInstance()
.getProperty(JsonKey.PAGE_SIZE_CONTENT_FETCH))));
return (Map<String, Object>) batchMap.get(id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.sunbird.common.Constants;
import org.sunbird.common.ElasticSearchHelper;
import org.sunbird.common.exception.ProjectCommonException;
import org.sunbird.common.factory.EsClientFactory;
Expand Down Expand Up @@ -264,6 +263,22 @@ public static boolean getContentRead(String courseId, Map<String, String> allHea
}
return flag;
}
public static Map<String, Object> getAllBatches(List identifierList,int pageSize) {
//int recordStart = 0;
int remainingRecords;
Map<String, Object> allRecords = new HashMap<>();
do {
Map.Entry<Integer, Map<String, Map<String, Object>>> contentsResult = batches(identifierList,allRecords.size(), pageSize);
int count = contentsResult.getKey();
Map<String, Map<String, Object>> batchMap = contentsResult.getValue();
allRecords.putAll(batchMap);
// Update remaining records and move to the next page if needed
remainingRecords = count - allRecords.size();
// recordStart = allRecords.size() - 1;
} while (remainingRecords > 0);

return allRecords;
}
public static Map<String, Object> getAllContent(List identifierList,int pageSize) {
//int recordStart = 0;
int remainingRecords;
Expand All @@ -280,9 +295,40 @@ public static Map<String, Object> getAllContent(List identifierList,int pageSize

return allRecords;
}
public static Map<String, Object> getAllBatches(int pageSize) {
return getAllBatches(null, pageSize);
}
public static Map<String, Object> getAllContent(int pageSize) {
return getAllContent(null, pageSize);
}

public static Map.Entry<Integer, Map<String, Map<String, Object>>> batches(List identifierList,int offset, int limit) {
SearchDTO searchDTO = new SearchDTO();
searchDTO.setOffset(offset);
searchDTO.setLimit(limit);
HashMap sort = new HashMap();
sort.put("createdDate","asc");
searchDTO.setSortBy(sort);
HashMap filters = new java.util.HashMap<String, Object>();
if(identifierList != null && identifierList.size() > 0)
filters.put(JsonKey.BATCH_ID,identifierList);
searchDTO.getAdditionalProperties().put(JsonKey.FILTERS, filters);
Future<Map<String, Object>> resultFuture = EsClientFactory.getInstance(JsonKey.REST).search(null,searchDTO, ProjectUtil.EsType.courseBatch.getTypeName(),isCotentElasticSearchTypeDoc());
HashMap result= (HashMap<String,Object>) ElasticSearchHelper.getResponseFromFuture(resultFuture);
Long longCount = (Long) result.getOrDefault(JsonKey.COUNT, 0L);
int count = longCount.intValue();
List<Map<String, Object>> batchesList = (List<Map<String, Object>>) result.getOrDefault(JsonKey.CONTENT, new ArrayList<>());
Map<String, Map<String, Object>> batchesMap = new HashMap<>();
if (CollectionUtils.isNotEmpty(batchesList)) {
for (Map<String, Object> batch : batchesList) {
String batchId = (String) batch.get(JsonKey.IDENTIFIER);
if(null != batchId && !batchId.isEmpty())
batchesMap.put(batchId, batch);
}
}
return new AbstractMap.SimpleEntry<>(count, batchesMap);
}

public static Map.Entry<Integer, Map<String, Map<String, Object>>> contents(List identifierList,int offset, int limit) {
SearchDTO searchDTO = new SearchDTO();
searchDTO.setOffset(offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,21 @@ public static Map<String, Object> esEventMapping(EventBatch eventBatch, String p
return esCourseMap;
}

public static Map<String, Object> validateEventBatch(RequestContext requestContext, String eventId, String batchId) {
Future<Map<String, Object>> resultF =
esUtil.getDataByIdentifier(requestContext, EsType.eventBatch.getTypeName(), batchId);
Map<String, Object> result =
(Map<String, Object>) ElasticSearchHelper.getResponseFromFuture(resultF);
if (MapUtils.isEmpty(result)) {
ProjectCommonException.throwClientErrorException(
ResponseCode.CLIENT_ERROR, "No such batchId exists");
}
if (StringUtils.isNotBlank(eventId)
&& !StringUtils.equals(eventId, (String) result.get(JsonKey.EVENT_ID))) {
ProjectCommonException.throwClientErrorException(
ResponseCode.CLIENT_ERROR, "batchId is not linked with eventId");
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public static void schedule() {
service.scheduleWithFixedDelay(new DataCacheHandler(), 0, PAGE_DATA_TTL, TimeUnit.HOURS);
service.scheduleWithFixedDelay(new PageCacheLoaderService(), 0, PAGE_DATA_TTL, TimeUnit.HOURS);
service.scheduleWithFixedDelay(new ContentCacheHandler(), 0, 15, TimeUnit.MINUTES);
service.scheduleWithFixedDelay(new BatchCacheHandler(), 0, 15, TimeUnit.MINUTES);

logger.info(null,
"SchedulerManager:schedule: Started scheduler job for cache refresh.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.sunbird.common.models.util.{JsonKey, ProjectUtil}
import org.sunbird.common.request.RequestContext
import org.sunbird.dto.SearchDTO
import org.sunbird.helper.ServiceFactory
import org.sunbird.learner.util.BatchCacheHandler

abstract class BaseEnrolmentActor extends BaseActor {

Expand All @@ -28,6 +29,8 @@ abstract class BaseEnrolmentActor extends BaseActor {
response.getOrDefault(JsonKey.CONTENT, new java.util.ArrayList[util.Map[String, AnyRef]]).asInstanceOf[util.List[util.Map[String, AnyRef]]]
}



def getBatchesV2(requestContext: RequestContext, batchId: String, courseId: String, requestedFields: java.util.List[String]): java.util.List[java.util.Map[String, AnyRef]] = {
val filters = new java.util.HashMap[String, AnyRef]() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.{Calendar, Collections, Comparator, Date, TimeZone, UUID}
import akka.actor.ActorRef
import com.fasterxml.jackson.databind.ObjectMapper
import org.sunbird.common.models.util.JsonKey
import org.sunbird.learner.util.Util
import org.sunbird.learner.util.{BatchCacheHandler, ContentCacheHandler, ContentSearchUtil, ContentUtil, CourseBatchSchedulerUtil, JsonUtil, Util}

import scala.collection.JavaConverters._
import javax.inject.{Inject, Named}
Expand All @@ -24,7 +24,6 @@ import org.sunbird.common.responsecode.ResponseCode
import org.sunbird.learner.actors.coursebatch.dao.impl.{BatchUserDaoImpl, CourseBatchDaoImpl, UserCoursesDaoImpl}
import org.sunbird.learner.actors.coursebatch.dao.{BatchUserDao, CourseBatchDao, UserCoursesDao}
import org.sunbird.learner.actors.group.dao.impl.GroupDaoImpl
import org.sunbird.learner.util.{ContentCacheHandler, ContentSearchUtil, ContentUtil, CourseBatchSchedulerUtil, JsonUtil, Util}
import org.sunbird.models.course.batch.CourseBatch
import org.sunbird.models.user.courses.UserCourses
import org.sunbird.cache.util.RedisCacheUtil
Expand Down Expand Up @@ -280,11 +279,18 @@ class CourseEnrolmentActor @Inject()(@Named("course-batch-notification-actor") c
new ObjectMapper().writeValueAsString(searchRequest)
}

def addBatchDetails(enrolmentList: util.List[util.Map[String, AnyRef]], request: Request): util.List[util.Map[String, AnyRef]] = {
def addBatchDetails(enrolmentList: util.List[util.Map[String, AnyRef]], request: Request,version:String): util.List[util.Map[String, AnyRef]] = {
val batchIds:java.util.List[String] = enrolmentList.map(e => e.getOrDefault(JsonKey.BATCH_ID, "").asInstanceOf[String]).distinct.filter(id => StringUtils.isNotBlank(id)).toList.asJava
val batchDetails = new java.util.ArrayList[java.util.Map[String, AnyRef]]();
val searchIdentifierMaxSize = Integer.parseInt(ProjectUtil.getConfigValue(JsonKey.SEARCH_IDENTIFIER_MAX_SIZE));
if (batchIds.size() > searchIdentifierMaxSize) {
if(JsonKey.VERSION_2.equalsIgnoreCase(version) &&
JsonKey.TRUE.equalsIgnoreCase(ProjectUtil.getConfigValue(JsonKey.ENROLLMENT_LIST_CACHE_BATCH_FETCH_ENABLED))){
logger.info(request.getRequestContext, "Retrieving batch details from the local cache");
for (i <- 0 to batchIds.size()-1) {
batchDetails.add(getBatchFrmLocalCache(batchIds.get(i)))
}
}
else if (batchIds.size() > searchIdentifierMaxSize) {
for (i <- 0 to batchIds.size() by searchIdentifierMaxSize) {
val batchIdsSubList: java.util.List[String] = batchIds.subList(i, Math.min(batchIds.size(), i + searchIdentifierMaxSize));
batchDetails.addAll(searchBatchDetails(batchIdsSubList, request))
Expand Down Expand Up @@ -541,7 +547,7 @@ class CourseEnrolmentActor @Inject()(@Named("course-batch-notification-actor") c
//if ("v2".equals(version))
// addBatchDetails_v2(updatedEnrolmentList, request)
//else
addBatchDetails(updatedEnrolmentList, request)
addBatchDetails(updatedEnrolmentList, request,version.asInstanceOf[String])

} else new java.util.ArrayList[java.util.Map[String, AnyRef]]()
}
Expand Down Expand Up @@ -580,6 +586,14 @@ class CourseEnrolmentActor @Inject()(@Named("course-batch-notification-actor") c
courseContent
}

def getBatchFrmLocalCache(batchId: String): java.util.Map[String, AnyRef] = {
val batchesMap = BatchCacheHandler.getBatchMap.asInstanceOf[java.util.Map[String, java.util.Map[String, AnyRef]]]
var batch = batchesMap.get(batchId)
if (batch == null || batch.size() < 1)
batch = BatchCacheHandler.getBatch(batchId)
batch
}

def isCourseEligible(enrolment: java.util.Map[String, AnyRef]): Boolean = {
val courseContent = getCourseContent(enrolment.get(JsonKey.COURSE_ID).asInstanceOf[String])
if (null == courseContent || (!JsonKey.LIVE.equalsIgnoreCase(courseContent.get(JsonKey.STATUS).asInstanceOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,5 +1177,9 @@ public final class JsonKey {
public static final String MEETING_LINK_URL = "meeting_link_url";
public static final String MEETING_LINK = "meetingLink";
public static final String KARMAYOGI_SAPTAH_END_DATE = "karmayogi_saptah_end_date";
public static final String ENROLLMENT_LIST_CACHE_BATCH_FETCH_ENABLED ="use_cache_for_enrollment_list_batch_fetch";
public static final String TRUE="true";
public static final String EVENT_COMPLETION_PERCENTAGE="eventCompletionPercentage";
public static final String ISSUE_EVENT_CERTIFICATE = "issue-event-certificate";
private JsonKey() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,6 @@ event.update.url=/event/v4/update/
user_issue_certificate_for_event =dev.issue.certificate.request
dashboard_user_event_state=dashboard.user.state
meeting_link_url=https://teams.microsoft.com/l/meetup-join/19%3ameeting_M2Y3ZDE2ZDMtMWQwYS00OWQzLWE3NDctNDRkNTdjOGI4Yzll%40thread.v2/0?context=%7b%22Tid%22%3a%2240cfb65c-9b71-435f-8bc2-bc2c69df1aca%22%2c%22Oid%22%3a%22cbd37bc9-5c33-401f-b590-9decb3c370f8%22%7d
karmayogi_saptah_end_date=2024-10-31
karmayogi_saptah_end_date=2024-10-31
use_cache_for_enrollment_list_batch_fetch=true
kafka_topics_event_certificate_instruction=local.event.issue.certificate.request
Loading

0 comments on commit 552380b

Please sign in to comment.