Skip to content

Commit

Permalink
provide more granular way to manage embedding cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-kripakov-m10 committed Jan 21, 2024
1 parent 557c19c commit f205da2
Show file tree
Hide file tree
Showing 14 changed files with 689 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package com.exadel.frs.core.trainservice.cache;

import com.exadel.frs.commonservice.entity.Embedding;
import com.exadel.frs.commonservice.projection.EmbeddingProjection;
import com.exadel.frs.core.trainservice.dto.CacheActionDto;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.AddEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.CacheAction;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveSubjects;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RenameSubjects;
import com.exadel.frs.core.trainservice.service.EmbeddingService;
import com.exadel.frs.core.trainservice.service.NotificationSenderService;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import static com.exadel.frs.core.trainservice.system.global.Constants.SERVER_UUID;

Expand All @@ -34,34 +42,81 @@ public class EmbeddingCacheProvider {
.build();

public EmbeddingCollection getOrLoad(final String apiKey) {

var result = cache.getIfPresent(apiKey);

if (result == null) {
result = embeddingService.doWithEnhancedEmbeddingProjectionStream(apiKey, EmbeddingCollection::from);

cache.put(apiKey, result);

notifyCacheEvent("UPDATE", apiKey);
}

return result;
}

public void ifPresent(String apiKey, Consumer<EmbeddingCollection> consumer) {
public void removeEmbedding(String apiKey, EmbeddingProjection embedding) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(consumer);
.ifPresent(
ec -> {
ec.removeEmbedding(embedding);
notifyCacheEvent(
CacheAction.REMOVE_EMBEDDINGS,
apiKey,
new RemoveEmbeddings(Map.of(embedding.subjectName(), List.of(embedding.embeddingId())))
);
}
);
}

cache.getIfPresent(apiKey);
notifyCacheEvent("UPDATE", apiKey);
public void updateSubjectName(String apiKey, String oldSubjectName, String newSubjectName) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(
ec -> {
ec.updateSubjectName(oldSubjectName, newSubjectName);
notifyCacheEvent(CacheAction.RENAME_SUBJECTS, apiKey, new RenameSubjects(Map.of(oldSubjectName, newSubjectName)));
}
);
}

public void removeBySubjectName(String apiKey, String subjectName) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(
ec -> {
ec.removeEmbeddingsBySubjectName(subjectName);
notifyCacheEvent(CacheAction.REMOVE_SUBJECTS, apiKey, new RemoveSubjects(List.of(subjectName)));
}
);
}


public void addEmbedding(String apiKey, Embedding embedding) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(
ec -> {
ec.addEmbedding(embedding);
notifyCacheEvent(CacheAction.ADD_EMBEDDINGS, apiKey, new AddEmbeddings(List.of(embedding.getId())));
}
);
}

/**
* Method can be used to make changes in cache without sending notification.
* Use it carefully, because changes you do will not be visible for other compreface-api instances
*
* @param apiKey domain
* @param action what to do with {@link EmbeddingCollection}
*/
public void expose(String apiKey, Consumer<EmbeddingCollection> action) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(action);
}

public void invalidate(final String apiKey) {
cache.invalidate(apiKey);
notifyCacheEvent("DELETE", apiKey);
notifyCacheEvent(CacheAction.INVALIDATE, apiKey, null);
}


/**
* @deprecated
* See {@link com.exadel.frs.core.trainservice.service.NotificationHandler#handleUpdate(CacheActionDto)}
*/
@Deprecated(forRemoval = true)
public void receivePutOnCache(String apiKey) {
var result = embeddingService.doWithEnhancedEmbeddingProjectionStream(apiKey, EmbeddingCollection::from);
cache.put(apiKey, result);
Expand All @@ -71,8 +126,8 @@ public void receiveInvalidateCache(final String apiKey) {
cache.invalidate(apiKey);
}

private void notifyCacheEvent(String event, String apiKey) {
CacheActionDto cacheActionDto = new CacheActionDto(event, apiKey, SERVER_UUID);
private <T> void notifyCacheEvent(CacheAction event, String apiKey, T action) {
CacheActionDto<T> cacheActionDto = new CacheActionDto<>(event, apiKey, SERVER_UUID, action);
notificationSenderService.notifyCacheChange(cacheActionDto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public Collection<String> getSubjectNames(final String apiKey) {
return subjectRepository.getSubjectNames(apiKey);
}

public List<Embedding> loadAllEmbeddingsByIds(Iterable<UUID> ids) {
return embeddingRepository.findByIdIn(ids);
}

@Transactional
public Subject deleteSubjectByName(final String apiKey, final String subjectName) {
final Optional<Subject> subjectOptional = subjectRepository.findByApiKeyAndSubjectNameIgnoreCase(apiKey, subjectName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,76 @@
package com.exadel.frs.core.trainservice.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CacheActionDto {
@JsonIgnoreProperties(ignoreUnknown = true) // here and below "ignoreUnknown = true" for backward compatibility
public class CacheActionDto<T> {
private CacheAction cacheAction;
private String apiKey;
@JsonProperty("uuid")
private UUID serverUUID;
private T payload;

@JsonProperty("cacheAction")
private String cacheAction;
public <S> CacheActionDto<S> withPayload(S newPayload) {
return new CacheActionDto<>(
cacheAction,
apiKey,
serverUUID,
newPayload
);
}

@JsonProperty("apiKey")
private String apiKey;
public enum CacheAction {
// UPDATE and DELETE stays here to support rolling update
@Deprecated
UPDATE,
@Deprecated
DELETE,
REMOVE_EMBEDDINGS,
REMOVE_SUBJECTS,
ADD_EMBEDDINGS,
RENAME_SUBJECTS,
INVALIDATE
}

@JsonProperty("uuid")
private String serverUUID;
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class RemoveEmbeddings {
private Map<String, List<UUID>> embeddings;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class RemoveSubjects {
private List<String> subjects;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class AddEmbeddings {
private List<UUID> embeddings;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class RenameSubjects {
private Map<String, String> subjectsNamesMapping;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.exadel.frs.core.trainservice.service;

import com.exadel.frs.commonservice.projection.EmbeddingProjection;
import com.exadel.frs.core.trainservice.cache.EmbeddingCacheProvider;
import com.exadel.frs.core.trainservice.dto.CacheActionDto;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.AddEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveSubjects;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RenameSubjects;
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationHandler {
private final EmbeddingCacheProvider cacheProvider;
private final SubjectService subjectService;

public void removeEmbeddings(CacheActionDto<RemoveEmbeddings> action) {
action.getPayload().getEmbeddings()
.entrySet()
.stream()
.filter(e -> StringUtils.isNotBlank(e.getKey()))
.filter(e -> Objects.nonNull(e.getValue()))
.filter(e -> !e.getValue().isEmpty())
.flatMap(e -> e.getValue().stream().filter(Objects::nonNull).map(id -> new EmbeddingProjection(id, e.getKey())))
.forEach(
em -> cacheProvider.expose(
action.getApiKey(),
c -> c.removeEmbedding(em)
)
);
}

public void removeSubjects(CacheActionDto<RemoveSubjects> action) {
action.getPayload().getSubjects()
.stream()
.filter(StringUtils::isNotBlank)
.forEach(
s -> cacheProvider.expose(
action.getApiKey(),
c -> c.removeEmbeddingsBySubjectName(s)
)
);
}


public void addEmbeddings(CacheActionDto<AddEmbeddings> action) {
var filtered = action.getPayload().getEmbeddings()
.stream()
.filter(Objects::nonNull)
.toList();
subjectService.loadEmbeddingsById(filtered)
.forEach(
em -> cacheProvider.expose(
action.getApiKey(),
c -> c.addEmbedding(em)
)
);
}

public void renameSubjects(CacheActionDto<RenameSubjects> action) {
action.getPayload().getSubjectsNamesMapping()
.entrySet()
.stream()
.filter(e -> StringUtils.isNotBlank(e.getKey()))
.filter(e -> StringUtils.isNotBlank(e.getValue()))
.forEach(
e -> cacheProvider.expose(
action.getApiKey(),
c -> c.updateSubjectName(e.getKey(), e.getValue())
)
);
}

public <T> void invalidate(CacheActionDto<T> action) {
cacheProvider.expose(
action.getApiKey(),
e -> cacheProvider.receiveInvalidateCache(action.getApiKey())
);
}

/**
* @param action cacheAction
* @deprecated in favour more granular cache managing.
* See {@link CacheActionDto}.
* Stays here to support rolling update
*/
@Deprecated(forRemoval = true)
public <T> void handleDelete(CacheActionDto<T> action) {
cacheProvider.receiveInvalidateCache(action.getApiKey());
}

/**
* @param action cacheAction
* @deprecated in favour more granular cache managing.
* See {@link CacheActionDto}.
* Stays here to support rolling update
*/
@Deprecated(forRemoval = true)
public <T> void handleUpdate(CacheActionDto<T> action) {
cacheProvider.receivePutOnCache(action.getApiKey());
}
}
Loading

0 comments on commit f205da2

Please sign in to comment.