Skip to content

Commit

Permalink
Merge pull request #405 from perfectsense/feature/batch-refresh
Browse files Browse the repository at this point in the history
Batch Refresh
  • Loading branch information
deepanjan90 authored Sep 6, 2022
2 parents 66913a3 + 9e3980e commit 0118a88
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 64 deletions.
157 changes: 104 additions & 53 deletions core/src/main/java/gyro/core/command/AbstractConfigCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -32,9 +34,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import gyro.core.GyroCore;
Expand All @@ -49,11 +48,13 @@
import gyro.core.diff.ChangeSettings;
import gyro.core.resource.DiffableInternals;
import gyro.core.resource.DiffableType;
import gyro.core.resource.RefreshException;
import gyro.core.resource.Resource;
import gyro.core.scope.FileScope;
import gyro.core.scope.RootScope;
import gyro.core.scope.Scope;
import gyro.core.scope.State;
import org.apache.commons.lang.time.StopWatch;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

Expand Down Expand Up @@ -194,18 +195,13 @@ protected void doExecute() throws Exception {
}

private void refreshResources(RootScope scope) {
ScheduledExecutorService messageService = Executors.newSingleThreadScheduledExecutor();
GyroUI ui = GyroCore.ui();
AtomicInteger started = new AtomicInteger();
AtomicInteger done = new AtomicInteger();

messageService.scheduleAtFixedRate(() -> {
ui.replace("@|magenta ⟳ Refreshing resources:|@ %s started, %s done", started.get(), done.get());
}, 0, 100, TimeUnit.MILLISECONDS);

ExecutorService refreshService = Executors.newWorkStealingPool(25);
ExecutorService refreshService = Executors.newWorkStealingPool();
List<Refresh> refreshes = new ArrayList<>();
Map<DiffableType, List<Resource>> refreshQueues = new HashMap<>();

// Group Resources by type.
for (FileScope fileScope : scope.getFileScopes()) {
String currentDirectory = System.getProperty("user.dir");
String currentFile = GyroCore.getRootDirectory().resolve(fileScope.getFile()).getParent().toString();
Expand All @@ -226,84 +222,139 @@ private void refreshResources(RootScope scope) {
processors.addAll(0, s.getSettings(ChangeSettings.class).getProcessors());
}

refreshes.add(new Refresh(resource, ui, refreshService.submit(() -> {
GyroCore.pushUi(ui);
List<Resource> refreshQueue =
refreshQueues.computeIfAbsent(DiffableType.getInstance(resource), m -> new ArrayList<>());
refreshQueue.add(resource);
}
}

// Refresh each type as a group.
for (DiffableType type : refreshQueues.keySet()) {
List<Resource> refreshQueue = refreshQueues.get(type);

refreshes.add(new Refresh(refreshQueue, ui, refreshService.submit(() -> {
GyroCore.pushUi(ui);

if (refreshQueue.isEmpty()) {
return null;
}

Resource peek = refreshQueue.get(0);
StopWatch stopWatch = new StopWatch();
stopWatch.start();

started.incrementAndGet();
// Run beforeRefresh processors
for (Resource resource : refreshQueue) {
List<ChangeProcessor> processors = new ArrayList<>();
for (Scope s = DiffableInternals.getScope(resource); s != null; s = s.getParent()) {
processors.addAll(0, s.getSettings(ChangeSettings.class).getProcessors());
}

for (ChangeProcessor processor : processors) {
processor.beforeRefresh(ui, resource);
}
}

boolean keep = resource.refresh();
Map<? extends Resource, Boolean> refreshResults = peek.batchRefresh(refreshQueue);

// Run afterRefresh processors
for (Resource resource : refreshQueue) {
List<ChangeProcessor> processors = new ArrayList<>();
for (Scope s = DiffableInternals.getScope(resource); s != null; s = s.getParent()) {
processors.addAll(0, s.getSettings(ChangeSettings.class).getProcessors());
}

for (ChangeProcessor processor : processors) {
processor.afterRefresh(ui, resource);
}

if (keep) {
if (refreshResults.containsKey(resource) && refreshResults.get(resource)) {
DiffableInternals.getModifications(resource).forEach(m -> m.refresh(resource));
DiffableInternals.disconnect(resource, true);
DiffableInternals.update(resource);
}
}

done.incrementAndGet();
stopWatch.stop();
Duration duration = Duration.ofMillis(stopWatch.getTime());

if (keep) {
DiffableInternals.disconnect(resource, true);
DiffableInternals.update(resource);
return false;
String time = "";
if (duration.getSeconds() <= 10) {
time = String.format("%dms", duration.toMillis());
} else {
time = String.format("%dm%ds", duration.toMinutes(), (duration.getSeconds() - (duration.toMinutes() * 60)));
}

} else {
return true;
}
})));
}
}
ui.write("Refreshing @|magenta,bold %s|@: @|green %s|@ %s refreshed in @|green %s|@ elapsed\n",
DiffableType.getInstance(peek).getName(),
refreshResults.size(),
refreshResults.size() == 1 ? "resource" : "resources",
time);

refreshService.shutdown();
for (Resource resource : refreshResults.keySet()) {
boolean refreshed = refreshResults.get(resource);

for (Refresh refresh : refreshes) {
Resource resource = refresh.resource;
String typeName = DiffableType.getInstance(resource).getName();
String name = DiffableInternals.getName(resource);
String typeName = DiffableType.getInstance(resource).getName();
String name = DiffableInternals.getName(resource);

try {
if (refresh.future.get()) {
ui.replace("@|magenta - Removing from state:|@ %s %s\n", typeName, name);
scope.getFileScopes().forEach(s -> s.remove(resource.primaryKey()));
if (!refreshed) {
ui.replace("@|magenta - Removing from state:|@ %s %s\n", typeName, name);
scope.getFileScopes().forEach(s -> s.remove(resource.primaryKey()));
}
}

} catch (ExecutionException error) {
refreshService.shutdownNow();
messageService.shutdown();

ui.write("\n");
return null;
})));

throw new GyroException(
String.format("Can't refresh @|bold %s %s|@ resource!", typeName, name),
error.getCause());
for (Refresh refresh : refreshes) {
try {
refresh.future.get();
} catch (InterruptedException erorr) {
Thread.currentThread().interrupt();
return;
} catch (ExecutionException error) {
Throwable cause = error.getCause();
if (cause instanceof RefreshException) {
ui.write("\n");

RefreshException refreshException = (RefreshException) cause;

Resource resource = refreshException.getResource();
String typeName = DiffableType.getInstance(resource).getName();
String name = DiffableInternals.getName(resource);

throw new GyroException(
String.format("Can't refresh @|bold %s %s|@ resource!", typeName, name),
error.getCause());
} else {

} catch (InterruptedException error) {
Thread.currentThread().interrupt();
return;
throw new GyroException(
String.format("Can't refresh @|bold %s |@ resource group!", refresh.typeName()),
error.getCause());
}
}
}
}

messageService.shutdown();
ui.replace("@|magenta ⟳ Refreshed resources:|@ %s\n", refreshes.size());
refreshService.shutdown();
}

private static class Refresh {

public final Resource resource;
public final Future<Boolean> future;
public final List<Resource> resources;
public final Future<?> future;
public final GyroUI ui;

public Refresh(Resource resource, GyroUI ui, Future<Boolean> future) {
this.resource = resource;
public Refresh(List<Resource> resources, GyroUI ui, Future<?> future) {
this.resources = resources;
this.ui = ui;
this.future = future;
}

public String typeName() {
Resource resource = resources.get(0);
return DiffableType.getInstance(resource).getName();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@ private synchronized String stop(GyroUI ui, Resource resource) {
}
}

@Override
public void beforeRefresh(GyroUI ui, Resource resource) throws Exception {
start(resource);
}

@Override
public void afterRefresh(GyroUI ui, Resource resource) throws Exception {
String elapsed = stop(ui, resource);
ui.write("Refreshing @|magenta,bold %s|@ took: @|green %s|@\n", resource.primaryKey(), elapsed);
}

@Override
public void beforeCreate(GyroUI ui, State state, Resource resource) throws Exception {
start(resource);
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/java/gyro/core/resource/RefreshException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package gyro.core.resource;

public class RefreshException extends RuntimeException {

private Resource resource;

public RefreshException(Resource resource) {
this.resource = resource;
}

public RefreshException(String message, Resource resource) {
super(message);
this.resource = resource;
}

public RefreshException(String message, Throwable cause, Resource resource) {
super(message, cause);
this.resource = resource;
}

public RefreshException(Throwable cause, Resource resource) {
super(cause);
this.resource = resource;
}

public RefreshException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace,
Resource resource) {
super(message, cause, enableSuppression, writableStackTrace);
this.resource = resource;
}

public Resource getResource() {
return resource;
}
}
44 changes: 44 additions & 0 deletions core/src/main/java/gyro/core/resource/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@

package gyro.core.resource;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import gyro.core.GyroUI;
import gyro.core.auth.Credentials;
Expand All @@ -27,6 +35,31 @@ public abstract class Resource extends Diffable {

public abstract boolean refresh();

public Map<? extends Resource, Boolean> batchRefresh(List<? extends Resource> resources) {
Map<Resource, Boolean> refreshResults = new HashMap<>();

ExecutorService refreshService = Executors.newWorkStealingPool();
List<Refresh> refreshes = new ArrayList<>();

for (Resource resource : resources) {
refreshes.add(new Refresh(resource, refreshService.submit(resource::refresh)));
}

for (Refresh refresh : refreshes) {
try {
refreshResults.put(refresh.resource, refresh.future.get());
} catch (ExecutionException error) {
throw new RefreshException(error, refresh.resource);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} finally {
refreshService.shutdown();
}
}

return refreshResults;
}

public abstract void create(GyroUI ui, State state) throws Exception;

public abstract void update(GyroUI ui, State state, Resource current, Set<String> changedFieldNames)
Expand All @@ -52,4 +85,15 @@ public Object get(String key) {
public String primaryKey() {
return String.format("%s::%s", DiffableType.getInstance(getClass()).getName(), name);
}

private static class Refresh {

public final Resource resource;
public final Future<Boolean> future;

public Refresh(Resource resource, Future<Boolean> future) {
this.resource = resource;
this.future = future;
}
}
}

0 comments on commit 0118a88

Please sign in to comment.