Skip to content

Commit

Permalink
disable change history during persistence (#305)
Browse files Browse the repository at this point in the history
* disable change history during persistency

* addressed review comments.

* fix comments and adding an unit test
  • Loading branch information
yangyangv2 authored Oct 25, 2023
1 parent 5e5cd07 commit b38cf9d
Show file tree
Hide file tree
Showing 9 changed files with 841 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.dao.utils.SQLSchemaUtils;
import com.linkedin.metadata.dao.utils.SQLStatementUtils;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.IndexFilter;
import com.linkedin.metadata.query.IndexGroupByCriterion;
import com.linkedin.metadata.query.IndexSortCriterion;
import com.linkedin.metadata.query.ListResultMetadata;
import io.ebean.EbeanServer;
import io.ebean.SqlQuery;
import io.ebean.SqlRow;
Expand All @@ -33,15 +36,18 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.PersistenceException;
import lombok.extern.slf4j.Slf4j;
import org.json.simple.JSONObject;

import static com.linkedin.metadata.dao.EbeanLocalDAO.*;
import static com.linkedin.metadata.dao.utils.EBeanDAOUtils.*;
import static com.linkedin.metadata.dao.utils.SQLIndexFilterUtils.*;
import static com.linkedin.metadata.dao.utils.SQLSchemaUtils.*;
Expand Down Expand Up @@ -159,10 +165,12 @@ public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPEC
* @param aspectKeys a List of keys (urn, aspect pairings) to query for
* @param keysCount number of keys to query
* @param position position of the key to start from
* @param includeSoftDeleted whether to include soft deleted aspect in the query
*/
@Override
public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> aspectKeys, int keysCount, int position) {
@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> aspectKeys, int keysCount, int position,
boolean includeSoftDeleted) {

final int end = Math.min(aspectKeys.size(), position + keysCount);
final Map<Class<ASPECT>, Set<Urn>> keysToQueryMap = new HashMap<>();
Expand All @@ -178,12 +186,19 @@ public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
}

// each statement is for a single aspect class
List<String> selectStatements = keysToQueryMap.entrySet().stream()
.map(entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
Map<String, Class<ASPECT>> selectStatements = keysToQueryMap.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue(), includeSoftDeleted),
entry -> entry.getKey()));

// consolidate/join the results
List<SqlRow> sqlRows = selectStatements.stream().flatMap(sql -> _server.createSqlQuery(sql).findList().stream()).collect(Collectors.toList());
final Map<SqlRow, Class<ASPECT>> sqlRows = new LinkedHashMap<>();
for (Map.Entry<String, Class<ASPECT>> entry : selectStatements.entrySet()) {
for (SqlRow sqlRow : _server.createSqlQuery(entry.getKey()).findList()) {
sqlRows.put(sqlRow, entry.getValue());
}
}
return EBeanDAOUtils.readSqlRows(sqlRows);
}

Expand All @@ -206,7 +221,7 @@ public ListResult<URN> listUrns(@Nonnull IndexFilter indexFilter, @Nullable Inde
return toListResult(actualTotalCount, start, pageSize);
}
final List<URN> values = sqlRows.stream().map(sqlRow -> getUrn(sqlRow.getString("urn"), _urnClass)).collect(Collectors.toList());
return toListResult(values, sqlRows, start, pageSize);
return toListResult(values, sqlRows, null, start, pageSize);
}

@Override
Expand All @@ -233,9 +248,69 @@ public <ASPECT extends RecordTemplate> ListResult<URN> listUrns(@Nonnull Class<A
final List<URN> values = sqlRows.stream()
.map(sqlRow -> getUrn(sqlRow.getString("urn"), _urnClass))
.collect(Collectors.toList());
return toListResult(values, sqlRows, start, pageSize);
return toListResult(values, sqlRows, null, start, pageSize);
}

@Nonnull
@Override
public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<ASPECT> aspectClass, @Nonnull URN urn,
int start, int pageSize) {
// start / pageSize will be ignored since there will be at most one record returned from entity table.
final String listAspectByUrnSql = SQLStatementUtils.createListAspectByUrnSql(aspectClass, urn, false);
final SqlQuery sqlQuery = _server.createSqlQuery(listAspectByUrnSql);

try {
final SqlRow sqlRow = sqlQuery.findOne();
if (sqlRow == null) {
return toListResult(0, start, pageSize);
} else {
sqlRow.set("_total_count", 1);
final ASPECT aspect = RecordUtils.toRecordTemplate(aspectClass,
extractAspectJsonString(sqlRow.getString(getAspectColumnName(aspectClass))));
final ListResultMetadata listResultMetadata = new ListResultMetadata().setExtraInfos(new ExtraInfoArray());
final ExtraInfo extraInfo = new ExtraInfo().setUrn(urn)
.setVersion(LATEST_VERSION)
.setAudit(makeAuditStamp(sqlRow.getTimestamp("lastmodifiedon"), sqlRow.getString("lastmodifiedby"),
sqlRow.getString("createdfor")));
listResultMetadata.getExtraInfos().add(extraInfo);
return toListResult(Collections.singletonList(aspect), Collections.singletonList(sqlRow), listResultMetadata,
start, pageSize);
}
} catch (PersistenceException pe) {
throw new RuntimeException(
String.format("Expect at most 1 aspect value per entity per aspect type . Sql: %s", listAspectByUrnSql));
}
}




@Nonnull
@Override
public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<ASPECT> aspectClass, int start,
int pageSize) {

final String tableName = SQLSchemaUtils.getTableName(_entityType);
final String listAspectSql = SQLStatementUtils.createListAspectWithPaginationSql(aspectClass, tableName, false, start, pageSize);
final SqlQuery sqlQuery = _server.createSqlQuery(listAspectSql);
final List<SqlRow> sqlRows = sqlQuery.findList();
if (sqlRows.isEmpty()) {
return toListResult(0, start, pageSize);
}
final ListResultMetadata listResultMetadata = new ListResultMetadata().setExtraInfos(new ExtraInfoArray());
final List<ASPECT> aspectList = sqlRows.stream().map(sqlRow -> {
final ExtraInfo extraInfo = new ExtraInfo().setUrn(getUrn(sqlRow.getString("urn"), _urnClass))
.setVersion(LATEST_VERSION).setAudit(
makeAuditStamp(sqlRow.getTimestamp("lastmodifiedon"), sqlRow.getString("lastmodifiedby"),
sqlRow.getString("createdfor")));
listResultMetadata.getExtraInfos().add(extraInfo);
return RecordUtils.toRecordTemplate(aspectClass,
extractAspectJsonString(sqlRow.getString(getAspectColumnName(aspectClass))));
}).collect(Collectors.toList());
return toListResult(aspectList, sqlRows, listResultMetadata, start, pageSize);
}


@Nonnull
@Override
public Map<String, Long> countAggregate(@Nonnull IndexFilter indexFilter,
Expand Down Expand Up @@ -354,13 +429,14 @@ protected <T> ListResult<T> toListResult(int totalCount, int start, int pageSize
* Convert sqlRows into {@link ListResult}.
* @param values a list of query response result
* @param sqlRows list of {@link SqlRow} from ebean query execution
* @param listResultMetadata {@link ListResultMetadata} with {@link com.linkedin.metadata.query.ExtraInfo}
* @param start starting position
* @param pageSize number of rows in a page
* @param <T> type of query response
* @return {@link ListResult} which contains paging metadata information
*/
@Nonnull
protected <T> ListResult<T> toListResult(@Nonnull List<T> values, @Nonnull List<SqlRow> sqlRows,
protected <T> ListResult<T> toListResult(@Nonnull List<T> values, @Nonnull List<SqlRow> sqlRows, @Nullable ListResultMetadata listResultMetadata,
int start, int pageSize) {
if (pageSize == 0) {
pageSize = DEFAULT_PAGE_SIZE;
Expand All @@ -382,7 +458,7 @@ protected <T> ListResult<T> toListResult(@Nonnull List<T> values, @Nonnull List<
}
return ListResult.<T>builder()
.values(values)
.metadata(null)
.metadata(listResultMetadata)
.nextStart(nextStart)
.havingMore(hasNext)
.totalCount(totalCount)
Expand Down
Loading

0 comments on commit b38cf9d

Please sign in to comment.