Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide more granular way to manage embeddings' cache #1202

Open
wants to merge 1 commit into
base: 1.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package com.exadel.frs.core.trainservice.cache;

import com.exadel.frs.commonservice.entity.Embedding;
import com.exadel.frs.commonservice.projection.EmbeddingProjection;
import com.exadel.frs.core.trainservice.dto.CacheActionDto;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.AddEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.CacheAction;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveSubjects;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RenameSubjects;
import com.exadel.frs.core.trainservice.service.EmbeddingService;
import com.exadel.frs.core.trainservice.service.NotificationSenderService;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import static com.exadel.frs.core.trainservice.system.global.Constants.SERVER_UUID;

Expand All @@ -34,34 +42,81 @@ public class EmbeddingCacheProvider {
.build();

public EmbeddingCollection getOrLoad(final String apiKey) {

var result = cache.getIfPresent(apiKey);

if (result == null) {
result = embeddingService.doWithEnhancedEmbeddingProjectionStream(apiKey, EmbeddingCollection::from);

cache.put(apiKey, result);

notifyCacheEvent("UPDATE", apiKey);
}

return result;
}

public void ifPresent(String apiKey, Consumer<EmbeddingCollection> consumer) {
public void removeEmbedding(String apiKey, EmbeddingProjection embedding) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(consumer);
.ifPresent(
ec -> {
ec.removeEmbedding(embedding);
notifyCacheEvent(
CacheAction.REMOVE_EMBEDDINGS,
apiKey,
new RemoveEmbeddings(Map.of(embedding.subjectName(), List.of(embedding.embeddingId())))
);
}
);
}

cache.getIfPresent(apiKey);
notifyCacheEvent("UPDATE", apiKey);
public void updateSubjectName(String apiKey, String oldSubjectName, String newSubjectName) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(
ec -> {
ec.updateSubjectName(oldSubjectName, newSubjectName);
notifyCacheEvent(CacheAction.RENAME_SUBJECTS, apiKey, new RenameSubjects(Map.of(oldSubjectName, newSubjectName)));
}
);
}

public void removeBySubjectName(String apiKey, String subjectName) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(
ec -> {
ec.removeEmbeddingsBySubjectName(subjectName);
notifyCacheEvent(CacheAction.REMOVE_SUBJECTS, apiKey, new RemoveSubjects(List.of(subjectName)));
}
);
}


public void addEmbedding(String apiKey, Embedding embedding) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(
ec -> {
ec.addEmbedding(embedding);
notifyCacheEvent(CacheAction.ADD_EMBEDDINGS, apiKey, new AddEmbeddings(List.of(embedding.getId())));
}
);
}

/**
* Method can be used to make changes in cache without sending notification.
* Use it carefully, because changes you do will not be visible for other compreface-api instances
*
* @param apiKey domain
* @param action what to do with {@link EmbeddingCollection}
*/
public void expose(String apiKey, Consumer<EmbeddingCollection> action) {
Optional.ofNullable(cache.getIfPresent(apiKey))
.ifPresent(action);
}

public void invalidate(final String apiKey) {
cache.invalidate(apiKey);
notifyCacheEvent("DELETE", apiKey);
notifyCacheEvent(CacheAction.INVALIDATE, apiKey, null);
}


/**
* @deprecated
* See {@link com.exadel.frs.core.trainservice.service.NotificationHandler#handleUpdate(CacheActionDto)}
*/
@Deprecated(forRemoval = true)
public void receivePutOnCache(String apiKey) {
var result = embeddingService.doWithEnhancedEmbeddingProjectionStream(apiKey, EmbeddingCollection::from);
cache.put(apiKey, result);
Expand All @@ -71,8 +126,8 @@ public void receiveInvalidateCache(final String apiKey) {
cache.invalidate(apiKey);
}

private void notifyCacheEvent(String event, String apiKey) {
CacheActionDto cacheActionDto = new CacheActionDto(event, apiKey, SERVER_UUID);
private <T> void notifyCacheEvent(CacheAction event, String apiKey, T action) {
CacheActionDto<T> cacheActionDto = new CacheActionDto<>(event, apiKey, SERVER_UUID, action);
notificationSenderService.notifyCacheChange(cacheActionDto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public Collection<String> getSubjectNames(final String apiKey) {
return subjectRepository.getSubjectNames(apiKey);
}

public List<Embedding> loadAllEmbeddingsByIds(Iterable<UUID> ids) {
return embeddingRepository.findByIdIn(ids);
}

@Transactional
public Subject deleteSubjectByName(final String apiKey, final String subjectName) {
final Optional<Subject> subjectOptional = subjectRepository.findByApiKeyAndSubjectNameIgnoreCase(apiKey, subjectName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,76 @@
package com.exadel.frs.core.trainservice.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CacheActionDto {
@JsonIgnoreProperties(ignoreUnknown = true) // here and below "ignoreUnknown = true" for backward compatibility
public class CacheActionDto<T> {
private CacheAction cacheAction;
private String apiKey;
@JsonProperty("uuid")
private UUID serverUUID;
private T payload;

@JsonProperty("cacheAction")
private String cacheAction;
public <S> CacheActionDto<S> withPayload(S newPayload) {
return new CacheActionDto<>(
cacheAction,
apiKey,
serverUUID,
newPayload
);
}

@JsonProperty("apiKey")
private String apiKey;
public enum CacheAction {
// UPDATE and DELETE stays here to support rolling update
@Deprecated
UPDATE,
@Deprecated
DELETE,
REMOVE_EMBEDDINGS,
REMOVE_SUBJECTS,
ADD_EMBEDDINGS,
RENAME_SUBJECTS,
INVALIDATE
}

@JsonProperty("uuid")
private String serverUUID;
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class RemoveEmbeddings {
private Map<String, List<UUID>> embeddings;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class RemoveSubjects {
private List<String> subjects;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class AddEmbeddings {
private List<UUID> embeddings;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public static class RenameSubjects {
private Map<String, String> subjectsNamesMapping;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.exadel.frs.core.trainservice.service;

import com.exadel.frs.commonservice.projection.EmbeddingProjection;
import com.exadel.frs.core.trainservice.cache.EmbeddingCacheProvider;
import com.exadel.frs.core.trainservice.dto.CacheActionDto;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.AddEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveEmbeddings;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveSubjects;
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RenameSubjects;
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationHandler {
private final EmbeddingCacheProvider cacheProvider;
private final SubjectService subjectService;

public void removeEmbeddings(CacheActionDto<RemoveEmbeddings> action) {
action.getPayload().getEmbeddings()
.entrySet()
.stream()
.filter(e -> StringUtils.isNotBlank(e.getKey()))
.filter(e -> Objects.nonNull(e.getValue()))
.filter(e -> !e.getValue().isEmpty())
.flatMap(e -> e.getValue().stream().filter(Objects::nonNull).map(id -> new EmbeddingProjection(id, e.getKey())))
.forEach(
em -> cacheProvider.expose(
action.getApiKey(),
c -> c.removeEmbedding(em)
)
);
}

public void removeSubjects(CacheActionDto<RemoveSubjects> action) {
action.getPayload().getSubjects()
.stream()
.filter(StringUtils::isNotBlank)
.forEach(
s -> cacheProvider.expose(
action.getApiKey(),
c -> c.removeEmbeddingsBySubjectName(s)
)
);
}


public void addEmbeddings(CacheActionDto<AddEmbeddings> action) {
var filtered = action.getPayload().getEmbeddings()
.stream()
.filter(Objects::nonNull)
.toList();
subjectService.loadEmbeddingsById(filtered)
.forEach(
em -> cacheProvider.expose(
action.getApiKey(),
c -> c.addEmbedding(em)
)
);
}

public void renameSubjects(CacheActionDto<RenameSubjects> action) {
action.getPayload().getSubjectsNamesMapping()
.entrySet()
.stream()
.filter(e -> StringUtils.isNotBlank(e.getKey()))
.filter(e -> StringUtils.isNotBlank(e.getValue()))
.forEach(
e -> cacheProvider.expose(
action.getApiKey(),
c -> c.updateSubjectName(e.getKey(), e.getValue())
)
);
}

public <T> void invalidate(CacheActionDto<T> action) {
cacheProvider.expose(
action.getApiKey(),
e -> cacheProvider.receiveInvalidateCache(action.getApiKey())
);
}

/**
* @param action cacheAction
* @deprecated in favour more granular cache managing.
* See {@link CacheActionDto}.
* Stays here to support rolling update
*/
@Deprecated(forRemoval = true)
public <T> void handleDelete(CacheActionDto<T> action) {
cacheProvider.receiveInvalidateCache(action.getApiKey());
}

/**
* @param action cacheAction
* @deprecated in favour more granular cache managing.
* See {@link CacheActionDto}.
* Stays here to support rolling update
*/
@Deprecated(forRemoval = true)
public <T> void handleUpdate(CacheActionDto<T> action) {
cacheProvider.receivePutOnCache(action.getApiKey());
}
}
Loading
Loading