Skip to content

Commit

Permalink
RxJava and Guava Rx Client Providers.
Browse files Browse the repository at this point in the history
+ cleaning up ext/rx (removing deprecated rx, rx-java8 and rx-jsr166e modules)

Change-Id: I95c4f2f33931de793ed98dd0eda0044f792950a3
  • Loading branch information
pavelbucek committed Feb 20, 2017
1 parent a8d6c4b commit adc5115
Show file tree
Hide file tree
Showing 68 changed files with 832 additions and 5,955 deletions.
16 changes: 3 additions & 13 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
Copyright (c) 2013-2016 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2013-2017 Oracle and/or its affiliates. All rights reserved.
The contents of this file are subject to the terms of either the GNU
General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -241,29 +241,19 @@
<artifactId>jersey-cdi1x-ban-custom-hk2-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-guava</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-java8</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-jsr166e</artifactId>
<artifactId>jersey-rx-client-rxjava</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-rxjava</artifactId>
<artifactId>jersey-rx-client-rxjava2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
9 changes: 6 additions & 3 deletions examples/rx-client-webapp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,18 @@
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
</dependency>
<!-- TODO: reimplement RX support for RxJava and ListenableFuture -->
<!--dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-guava</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-rxjava</artifactId>
</dependency-->
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-rxjava2</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@

import org.glassfish.jersey.examples.rx.agent.AsyncAgentResource;
import org.glassfish.jersey.examples.rx.agent.CompletionStageAgentResource;
import org.glassfish.jersey.examples.rx.agent.FlowableAgentResource;
import org.glassfish.jersey.examples.rx.agent.ListenableFutureAgentResource;
import org.glassfish.jersey.examples.rx.agent.ObservableAgentResource;
import org.glassfish.jersey.examples.rx.agent.SyncAgentResource;
import org.glassfish.jersey.examples.rx.remote.CalculationResource;
import org.glassfish.jersey.examples.rx.remote.DestinationResource;
Expand All @@ -70,9 +73,9 @@ public RxApplication() {
// Agent (Client) Resources.
register(SyncAgentResource.class);
register(AsyncAgentResource.class);
// TODO: reimplement RX support for RxJava and ListenableFuture
// register(ObservableAgentResource.class);
// register(ListenableFutureAgentResource.class);
register(ObservableAgentResource.class);
register(FlowableAgentResource.class);
register(ListenableFutureAgentResource.class);
register(CompletionStageAgentResource.class);

// Providers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
import org.glassfish.jersey.server.Uri;

/**
* Obtain information about visited (destination) and recommended (destination, forecast, price) places for "Java8" user. Uses
* Java 8 CompletionStage and Jersey Client to obtain the data.
* Obtain information about visited (destination) and recommended (destination, forecast, price) places for
* "CompletionStage" user. Uses Java 8 CompletionStage and Jersey Client to obtain the data.
*
* @author Michal Gajdos
*/
Expand Down Expand Up @@ -114,7 +114,7 @@ private CompletionStage<List<Destination>> visited(final WebTarget destinationTa
final Queue<String> errors) {
return destinationTarget.path("visited").request()
// Identify the user.
.header("Rx-User", "Java8")
.header("Rx-User", "CompletionStage")
// Reactive invoker.
.rx(executor)
// Return a list of destinations.
Expand All @@ -132,7 +132,7 @@ private CompletionStage<List<Recommendation>> recommended(final WebTarget destin
final CompletionStage<List<Destination>> recommended = destinationTarget.path("recommended")
.request()
// Identify the user.
.header("Rx-User", "Java8")
.header("Rx-User", "CompletionStage")
// Reactive invoker.
.rx(executor)
// Return a list of destinations.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2017 Oracle and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License"). You
* may not use this file except in compliance with the License. You can
* obtain a copy of the License at
* http://glassfish.java.net/public/CDDL+GPL_1_1.html
* or packager/legal/LICENSE.txt. See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at packager/legal/LICENSE.txt.
*
* GPL Classpath Exception:
* Oracle designates this particular file as subject to the "Classpath"
* exception as provided by Oracle in the GPL Version 2 section of the License
* file that accompanied this code.
*
* Modifications:
* If applicable, add the following below the License Header, with the fields
* enclosed by brackets [] replaced by your own identifying information:
* "Portions Copyright [year] [name of copyright owner]"
*
* Contributor(s):
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license." If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above. However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/

package org.glassfish.jersey.examples.rx.agent;

import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.GenericType;

import javax.inject.Singleton;

import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvoker;
import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvokerProvider;
import org.glassfish.jersey.examples.rx.domain.AgentResponse;
import org.glassfish.jersey.examples.rx.domain.Calculation;
import org.glassfish.jersey.examples.rx.domain.Destination;
import org.glassfish.jersey.examples.rx.domain.Forecast;
import org.glassfish.jersey.examples.rx.domain.Recommendation;
import org.glassfish.jersey.server.Uri;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

/**
* Obtain information about visited (destination) and recommended (destination, forecast, price) places for "RxJava2"
* user. Uses RxJava2 Flowable and Jersey Client to obtain the data.
*
* @author Michal Gajdos
* @author Pavel Bucek (pavel.bucek at oracle.com)
*/
@Singleton
@Path("agent/flowable")
@Produces("application/json")
public class FlowableAgentResource {

@Uri("remote/destination")
private WebTarget destination;

@Uri("remote/calculation/from/{from}/to/{to}")
private WebTarget calculation;

@Uri("remote/forecast/{destination}")
private WebTarget forecast;

@GET
public void flowable(@Suspended final AsyncResponse async) {
final long time = System.nanoTime();
final Queue<String> errors = new ConcurrentLinkedQueue<>();

Flowable.just(new AgentResponse())
// Obtain visited destinations.
.zipWith(visited(errors), (agentResponse, visited) -> {
agentResponse.setVisited(visited);
return agentResponse;
})
// Obtain recommended destinations. (does not depend on visited ones)
.zipWith(
recommended(errors),
(agentResponse, recommendations) -> {
agentResponse.setRecommended(recommendations);
return agentResponse;
})
// Observe on another thread than the one processing visited or recommended destinations.
.observeOn(Schedulers.io())
// Subscribe.
.subscribe(agentResponse -> {
agentResponse.setProcessingTime((System.nanoTime() - time) / 1000000);
async.resume(agentResponse);

}, async::resume);
}

private Flowable<List<Destination>> visited(final Queue<String> errors) {
destination.register(RxFlowableInvokerProvider.class);

return destination.path("visited").request()
// Identify the user.
.header("Rx-User", "RxJava2")
// Reactive invoker.
.rx(RxFlowableInvoker.class)
// Return a list of destinations.
.get(new GenericType<List<Destination>>() {
})
// Handle Errors.
.onErrorReturn(throwable -> {
errors.offer("Visited: " + throwable.getMessage());
return Collections.emptyList();
});
}

private Flowable<List<Recommendation>> recommended(final Queue<String> errors) {
destination.register(RxFlowableInvokerProvider.class);

// Recommended places.
final Flowable<Destination> recommended = destination.path("recommended").request()
// Identify the user.
.header("Rx-User", "RxJava2")
// Reactive invoker.
.rx(RxFlowableInvoker.class)
// Return a list of destinations.
.get(new GenericType<List<Destination>>() {
})
// Handle Errors.
.onErrorReturn(throwable -> {
errors.offer("Recommended: " + throwable
.getMessage());
return Collections.emptyList();
})
// Emit destinations one-by-one.
.flatMap(Flowable::fromIterable)
// Remember emitted items for dependant requests.
.cache();

forecast.register(RxFlowableInvokerProvider.class);

// Forecasts. (depend on recommended destinations)
final Flowable<Forecast> forecasts = recommended.flatMap(destination ->
forecast
.resolveTemplate("destination", destination.getDestination())
.request().rx(RxFlowableInvoker.class).get(Forecast.class)
.onErrorReturn(throwable -> {
errors.offer("Forecast: " + throwable.getMessage());
return new Forecast(destination.getDestination(), "N/A");
}));

calculation.register(RxFlowableInvokerProvider.class);

// Calculations. (depend on recommended destinations)
final Flowable<Calculation> calculations = recommended.flatMap(destination -> {
return calculation.resolveTemplate("from", "Moon").resolveTemplate("to", destination.getDestination())
.request().rx(RxFlowableInvoker.class).get(Calculation.class)
.onErrorReturn(throwable -> {
errors.offer("Calculation: " + throwable.getMessage());
return new Calculation("Moon", destination.getDestination(), -1);
});
});

return Flowable.zip(recommended, forecasts, calculations, Recommendation::new).buffer(Integer.MAX_VALUE);
}
}
Loading

0 comments on commit adc5115

Please sign in to comment.