Skip to content

Commit

Permalink
Fix bulkUpload and adding the key value (#519)
Browse files Browse the repository at this point in the history
* Fix bulkUpload and adding the key value

* Adding the kafka logic update and redis key changes

* Review comment code
  • Loading branch information
Sahil-tarento authored Apr 1, 2024
1 parent 0406353 commit 47a7587
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
9 changes: 9 additions & 0 deletions src/main/java/org/sunbird/cache/RedisCacheMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,13 @@ public String getContentFromCache(String key) {
return null;
}
}

public Boolean isKeyExist(String key) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.exists(Constants.REDIS_COMMON_KEY + key);
} catch (Exception e) {
logger.error(e);
return null;
}
}
}
11 changes: 11 additions & 0 deletions src/main/java/org/sunbird/core/producer/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,15 @@ public void push(String topic, Object value) {
log.error(e);
}
}

public void pushWithKey(String topic, Object value, String key) {
ObjectMapper mapper = new ObjectMapper();
String message = null;
try {
message = mapper.writeValueAsString(value);
kafkaTemplate.send(topic, key, message);
} catch (JsonProcessingException e) {
log.error(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ public SBApiResponse bulkUpload(MultipartFile mFile, String orgId, String channe
response.getResult().putAll(uploadedFile);
uploadedFile.put(Constants.ORG_NAME, channel);
uploadedFile.put(Constants.X_AUTH_TOKEN, userAuthToken);
kafkaProducer.push(serverConfig.getUserBulkUploadTopic(), uploadedFile);
kafkaProducer.pushWithKey(serverConfig.getUserBulkUploadTopic(), uploadedFile, orgId);
sendBulkUploadNotification(orgId, channel, (String) uploadResponse.getResult().get(Constants.URL));
} catch (Exception e) {
setErrorData(response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.sunbird.cache.RedisCacheMgr;
import org.sunbird.cassandra.utils.CassandraOperation;
import org.sunbird.common.model.SBApiResponse;
import org.sunbird.common.util.CbExtServerProperties;
Expand Down Expand Up @@ -45,6 +46,9 @@ public class UserBulkUploadService {
@Autowired
StorageService storageService;

@Autowired
RedisCacheMgr redisCacheMgr;

public void initiateUserBulkUploadProcess(String inputData) {
logger.info("UserBulkUploadService:: initiateUserBulkUploadProcess: Started");
long duration = 0;
Expand Down Expand Up @@ -251,16 +255,22 @@ private void processBulkUpload(HashMap<String, String> inputDataMap) throws IOEx
} else {
invalidErrList.addAll(validateEmailContactAndDomain(userRegistration));
if (invalidErrList.isEmpty()) {
userRegistration.setUserAuthToken(inputDataMap.get(Constants.X_AUTH_TOKEN));
String responseCode = userUtilityService.createBulkUploadUser(userRegistration);
if (!responseCode.equalsIgnoreCase(Constants.OK)) {
failedRecordsCount++;
statusCell.setCellValue(Constants.FAILED_UPPERCASE);
errorDetails.setCellValue(responseCode);
if (redisCacheMgr.isKeyExist(userRegistration.getPhone()+ "" +userRegistration.getEmail())) {
logger.error("Key is already present in Redis Key: " + userRegistration.getPhone()+ "" +userRegistration.getEmail());
} else {
noOfSuccessfulRecords++;
statusCell.setCellValue(Constants.SUCCESS_UPPERCASE);
errorDetails.setCellValue("");
redisCacheMgr.putCache(userRegistration.getPhone().trim()+ "" +userRegistration.getEmail(),
"");
userRegistration.setUserAuthToken(inputDataMap.get(Constants.X_AUTH_TOKEN));
String responseCode = userUtilityService.createBulkUploadUser(userRegistration);
if (!Constants.OK.equalsIgnoreCase(responseCode)) {
failedRecordsCount++;
statusCell.setCellValue(Constants.FAILED_UPPERCASE);
errorDetails.setCellValue(responseCode);
} else {
noOfSuccessfulRecords++;
statusCell.setCellValue(Constants.SUCCESS_UPPERCASE);
errorDetails.setCellValue("");
}
}
} else {
failedRecordsCount++;
Expand Down

0 comments on commit 47a7587

Please sign in to comment.