diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java new file mode 100644 index 0000000000..354dd90282 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/AbstractPojoJoin.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.apex.malhar.lib.window.MergeAccumulation; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Throwables; + +import com.datatorrent.lib.util.PojoUtils; + +/** + * Join Accumulation for Pojo Streams. + * + */ +@InterfaceStability.Evolving +public abstract class AbstractPojoJoin + implements MergeAccumulation>>, List> +{ + protected final String[] keys; + protected final Class outClass; + private transient Map gettersStream1; + private transient Map gettersStream2; + private transient Map setters; + protected transient Set keySetStream2; + protected transient Set keySetStream1; + + public AbstractPojoJoin() + { + keys = new String[]{}; + outClass = null; + } + + public AbstractPojoJoin(Class outClass, String... keys) + { + if (keys.length % 2 != 0) { + throw new IllegalArgumentException("Wrong number of keys."); + } + + this.keys = Arrays.copyOf(keys, keys.length); + this.outClass = outClass; + } + + private void createSetters() + { + Field[] fields = outClass.getDeclaredFields(); + setters = new HashMap<>(); + for (Field field : fields) { + Class outputField = ClassUtils.primitiveToWrapper(field.getType()); + String fieldName = field.getName(); + setters.put(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField)); + } + } + + private Map createGetters(Class input) + { + Field[] fields = input.getDeclaredFields(); + Map getters = new HashMap<>(); + for (Field field : fields) { + Class inputField = ClassUtils.primitiveToWrapper(field.getType()); + String fieldName = field.getName(); + getters.put(fieldName,PojoUtils.createGetter(input, fieldName, inputField)); + } + return getters; + } + + @Override + public List>> accumulate(List>> accumulatedValue, InputT1 input) + { + if (gettersStream1 == null) { + gettersStream1 = createGetters(input.getClass()); + } + try { + return accumulateWithIndex(0, accumulatedValue, input); + } catch (NoSuchFieldException e) { + throw Throwables.propagate(e); + } + } + + @Override + public List>> accumulate2(List>> accumulatedValue, InputT2 input) + { + if (gettersStream2 == null) { + gettersStream2 = createGetters(input.getClass()); + } + try { + return accumulateWithIndex(1, accumulatedValue, input); + } catch (NoSuchFieldException e) { + throw Throwables.propagate(e); + } + } + + + @Override + public List>> defaultAccumulatedValue() + { + List>> accu = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + accu.add(new ArrayList>()); + } + return accu; + } + + + private List>> accumulateWithIndex(int index, List>> accu, Object input) throws NoSuchFieldException + { + // TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected. + + List> curList = accu.get(index); + Map map = pojoToMap(input,index + 1); + curList.add(map); + accu.set(index, curList); + + return accu; + } + + private Map pojoToMap(Object input, int streamIndex) + { + Map map = new HashMap<>(); + Map gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2; + + for (Map.Entry getter : gettersStream.entrySet()) { + try { + Object value = getter.getValue().get(input); + map.put(getter.getKey(), value); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + return map; + } + + @Override + public List>> merge(List>> accumulatedValue1, List>> accumulatedValue2) + { + for (int i = 0; i < 2; i++) { + List> curList = accumulatedValue1.get(i); + curList.addAll(accumulatedValue2.get(i)); + accumulatedValue1.set(i, curList); + } + return accumulatedValue1; + } + + @Override + public List getOutput(List>> accumulatedValue) + { + if (setters == null) { + createSetters(); + keySetStream2 = new HashSet<>(); + keySetStream1 = new HashSet<>(); + for (int i = 0; i < keys.length; i = i + 2) { + keySetStream1.add(keys[i]); + keySetStream2.add(keys[i + 1]); + } + } + + // TODO: May need to revisit (use state manager). + List> result = getAllCombo(0, accumulatedValue, null); + + List out = new ArrayList<>(); + for (Map resultMap : result) { + Object o; + try { + o = outClass.newInstance(); + } catch (Throwable e) { + throw Throwables.propagate(e); + } + for (Map.Entry setter : setters.entrySet()) { + if (resultMap.get(setter.getKey()) != null) { + setter.getValue().set(o, resultMap.get(setter.getKey())); + } + } + out.add(o); + } + + return out; + } + + + public List> getAllCombo(int streamIndex, List>> accu, Map curMap) + { + List> result = new ArrayList<>(); + int leftStreamIndex = getLeftStreamIndex(); + List> leftStream = accu.get(leftStreamIndex); + List> rightStream = accu.get((leftStreamIndex + 1) % 2); + for (Map lMap : leftStream) { + boolean gotMatch = false; + for (Map rMap : rightStream) { + Map tempMap = joinTwoMapsWithKeys(lMap, rMap); + if (tempMap != null) { + result.add(tempMap); + gotMatch = true; + } + } + if (!gotMatch) { + addNonMatchingResult(result, lMap, rightStream.get(0).keySet()); + } + } + return result; + } + + public abstract void addNonMatchingResult(List> result, Map requiredMap, Set nullFields); + + public abstract int getLeftStreamIndex(); + + + public Map joinTwoMapsWithKeys(Map map1, Map map2) + { + for (int i = 0; i < keys.length; i = i + 2) { + String key1 = keys[i]; + String key2 = keys[i + 1]; + if (!map1.get(key1).equals(map2.get(key2))) { + return null; + } + } + for (String field : map2.keySet()) { + if (!keySetStream2.contains(field)) { + map1.put(field, map2.get(field)); + } + } + return map1; + } + + @Override + public List getRetraction(List value) + { + return null; + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java new file mode 100644 index 0000000000..8ad0467c60 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoFullOuterJoin.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Full outer join Accumulation for Pojo Streams. + * + */ +@InterfaceStability.Evolving +public class PojoFullOuterJoin + extends AbstractPojoJoin +{ + public PojoFullOuterJoin() + { + super(); + } + + public PojoFullOuterJoin(int num, Class outClass, String... keys) + { + super(outClass,keys); + } + + @Override + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + { + for (Object field : nullFields) { + if (!keySetStream2.contains(field)) { + requiredMap.put(field.toString(), null); + } + } + result.add(requiredMap); + } + + @Override + public int getLeftStreamIndex() + { + return 0; + } + + @Override + public List> getAllCombo(int streamIndex, List accu, Map curMap) + { + List> result = new ArrayList<>(); + int leftStreamIndex = getLeftStreamIndex(); + List> leftStream = (List>)accu.get(leftStreamIndex); + List> rightStream = (List>)accu.get((leftStreamIndex + 1) % 2); + Set> unMatchedRightStream = new HashSet<>(); + + for (Map rMap : rightStream) { + unMatchedRightStream.add(rMap); + } + for (Map lMap : leftStream) { + boolean gotMatch = false; + for (Map rMap : rightStream) { + Map tempMap = joinTwoMapsWithKeys(lMap, rMap); + if (tempMap != null) { + result.add(tempMap); + gotMatch = true; + if (unMatchedRightStream.contains(rMap)) { + unMatchedRightStream.remove(rMap); + } + } + } + if (!gotMatch) { + addNonMatchingResult(result, lMap, rightStream.get(0).keySet()); + } + } + + for (Map rMap : unMatchedRightStream) { + for (Object field : leftStream.get(0).keySet()) { + if (!keySetStream1.contains(field)) { + rMap.put(field.toString(), null); + } + } + result.add(rMap); + } + return result; + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java index 1aa55c2667..ceb17dd3d5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoInnerJoin.java @@ -18,234 +18,38 @@ */ package org.apache.apex.malhar.lib.window.accumulation; -import java.lang.reflect.Field; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.apex.malhar.lib.window.MergeAccumulation; -import org.apache.commons.lang3.ClassUtils; - -import com.google.common.base.Throwables; - -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.lib.util.PojoUtils; - /** * Inner join Accumulation for Pojo Streams. * * @since 3.6.0 */ public class PojoInnerJoin - implements MergeAccumulation>>, List> + extends AbstractPojoJoin { - protected final String[] keys; - protected final Class outClass; - private transient List> gettersStream1; - private transient List> gettersStream2; - private transient List> setters; - private transient Set keySetStream2; - public PojoInnerJoin() { - keys = new String[]{}; - outClass = null; + super(); } public PojoInnerJoin(int num, Class outClass, String... keys) { - if (keys.length % 2 != 0) { - throw new IllegalArgumentException("Wrong number of keys."); - } + super(outClass,keys); - this.keys = Arrays.copyOf(keys, keys.length); - this.outClass = outClass; - } - - private void createSetters() - { - Field[] fields = outClass.getDeclaredFields(); - setters = new ArrayList<>(); - for (Field field : fields) { - Class outputField = ClassUtils.primitiveToWrapper(field.getType()); - String fieldName = field.getName(); - setters.add(new KeyValPair<>(fieldName,PojoUtils.createSetter(outClass,fieldName,outputField))); - } - } - - private List> createGetters(Class input) - { - Field[] fields = input.getDeclaredFields(); - List> getters = new ArrayList<>(); - for (Field field : fields) { - Class inputField = ClassUtils.primitiveToWrapper(field.getType()); - String fieldName = field.getName(); - getters.add(new KeyValPair<>(fieldName,PojoUtils.createGetter(input, fieldName, inputField))); - } - return getters; } @Override - public List>> accumulate(List>> accumulatedValue, InputT1 input) - { - if (gettersStream1 == null) { - gettersStream1 = createGetters(input.getClass()); - } - try { - return accumulateWithIndex(0, accumulatedValue, input); - } catch (NoSuchFieldException e) { - throw Throwables.propagate(e); - } - } - - @Override - public List>> accumulate2(List>> accumulatedValue, InputT2 input) - { - if (gettersStream2 == null) { - gettersStream2 = createGetters(input.getClass()); - } - try { - return accumulateWithIndex(1, accumulatedValue, input); - } catch (NoSuchFieldException e) { - throw Throwables.propagate(e); - } - } - - - @Override - public List>> defaultAccumulatedValue() - { - List>> accu = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - accu.add(new ArrayList>()); - } - return accu; - } - - - private List>> accumulateWithIndex(int index, List>> accu, Object input) throws NoSuchFieldException - { - // TODO: If a stream never sends out any tuple during one window, a wrong key would not be detected. - - input.getClass().getDeclaredField(keys[index]); - - List> curList = accu.get(index); - Map map = pojoToMap(input,index + 1); - curList.add(map); - accu.set(index, curList); - - return accu; - } - - private Map pojoToMap(Object input, int streamIndex) - { - Map map = new HashMap<>(); - List> gettersStream = streamIndex == 1 ? gettersStream1 : gettersStream2; - - for (KeyValPair getter : gettersStream) { - try { - Object value = getter.getValue().get(input); - map.put(getter.getKey(), value); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - return map; - } - - @Override - public List>> merge(List>> accumulatedValue1, List>> accumulatedValue2) - { - for (int i = 0; i < 2; i++) { - List> curList = accumulatedValue1.get(i); - curList.addAll(accumulatedValue2.get(i)); - accumulatedValue1.set(i, curList); - } - return accumulatedValue1; - } - - @Override - public List getOutput(List>> accumulatedValue) - { - List> result = new ArrayList<>(); - if (setters == null) { - createSetters(); - keySetStream2 = new HashSet<>(); - for (int i = 0; i < keys.length; i = i + 2) { - keySetStream2.add(keys[i + 1]); - } - } - - // TODO: May need to revisit (use state manager). - result = getAllCombo(0, accumulatedValue, result, null); - - List out = new ArrayList<>(); - for (Map resultMap : result) { - Object o; - try { - o = outClass.newInstance(); - } catch (Throwable e) { - throw Throwables.propagate(e); - } - for (KeyValPair setter : setters) { - setter.getValue().set(o,resultMap.get(setter.getKey())); - } - out.add(o); - } - - return out; - } - - - private List> getAllCombo(int streamIndex, List>> accu, List> result, Map curMap) - { - if (streamIndex == 2) { - if (curMap != null) { - result.add(curMap); - } - return result; - } else { - for (Map map : accu.get(streamIndex)) { - if (streamIndex == 0) { - Map tempMap = new HashMap<>(map); - result = getAllCombo(streamIndex + 1, accu, result, tempMap); - } else if (curMap == null) { - return result; - } else { - Map tempMap = new HashMap<>(curMap); - tempMap = joinTwoMapsWithKeys(tempMap, map); - result = getAllCombo(streamIndex + 1, accu, result, tempMap); - } - } - return result; - } - } - - private Map joinTwoMapsWithKeys(Map map1, Map map2) + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) { - for (int i = 0; i < keys.length; i = i + 2) { - String key1 = keys[i]; - String key2 = keys[i + 1]; - if (!map1.get(key1).equals(map2.get(key2))) { - return null; - } - } - for (String field : map2.keySet()) { - if (!keySetStream2.contains(field)) { - map1.put(field, map2.get(field)); - } - } - return map1; + return; } @Override - public List getRetraction(List value) + public int getLeftStreamIndex() { - return null; + return 0; } } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java new file mode 100644 index 0000000000..0ee3e00f63 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoLeftOuterJoin.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Left Outer join Accumulation for Pojo Streams. + * + */ +@InterfaceStability.Evolving +public class PojoLeftOuterJoin + extends AbstractPojoJoin +{ + public PojoLeftOuterJoin() + { + super(); + } + + public PojoLeftOuterJoin(int num, Class outClass, String... keys) + { + super(outClass,keys); + + } + + @Override + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + { + for (Object field : nullFields) { + if (!keySetStream2.contains(field)) { + requiredMap.put(field.toString(), null); + } + } + result.add(requiredMap); + } + + @Override + public int getLeftStreamIndex() + { + return 0; + } + +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java new file mode 100644 index 0000000000..60b025285f --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/PojoRightOuterJoin.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Right outer join Accumulation for Pojo Streams. + * + */ +@InterfaceStability.Evolving +public class PojoRightOuterJoin + extends AbstractPojoJoin +{ + public PojoRightOuterJoin() + { + super(); + } + + public PojoRightOuterJoin(int num, Class outClass, String... keys) + { + super(outClass,keys); + + } + + @Override + public void addNonMatchingResult(List result, Map requiredMap, Set nullFields) + { + for (Object field : nullFields) { + if (!keySetStream1.contains(field)) { + requiredMap.put(field.toString(), null); + } + } + result.add(requiredMap); + } + + @Override + public int getLeftStreamIndex() + { + return 1; + } + +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java new file mode 100644 index 0000000000..aaa7de3d8e --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/PojoOuterJoinTest.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for POJO outer join accumulations + */ +public class PojoOuterJoinTest +{ + + public static class TestPojo1 + { + private int uId; + private String uName; + + public TestPojo1() + { + + } + + public TestPojo1(int id, String name) + { + this.uId = id; + this.uName = name; + } + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + } + + public static class TestPojo3 + { + private int uId; + private String uNickName; + private int age; + + public TestPojo3() + { + + } + + public TestPojo3(int id, String name, int age) + { + this.uId = id; + this.uNickName = name; + this.age = age; + } + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUNickName() + { + return uNickName; + } + + public void setUNickName(String uNickName) + { + this.uNickName = uNickName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + } + + + public static class TestOutClass + { + private int uId; + private String uName; + private String uNickName; + private int age; + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + + public String getUNickName() + { + return uNickName; + } + + public void setUNickName(String uNickName) + { + this.uNickName = uNickName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + } + + public static class TestOutMultipleKeysClass + { + private int uId; + private String uName; + private int age; + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + } + + + @Test + public void PojoLeftOuterJoinTest() + { + PojoLeftOuterJoin pij = new PojoLeftOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + + List>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); + + Assert.assertEquals(2, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(1); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + Assert.assertEquals(2, testOutClass.getUId()); + Assert.assertEquals("Bob", testOutClass.getUName()); + Assert.assertEquals(0, testOutClass.getAge()); + } + + @Test + public void PojoRightOuterJoinTest() + { + PojoRightOuterJoin pij = new PojoRightOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + + List>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); + + Assert.assertEquals(2, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(1); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + Assert.assertEquals(3, testOutClass.getUId()); + Assert.assertEquals(null, testOutClass.getUName()); + Assert.assertEquals(13, testOutClass.getAge()); + } + + @Test + public void PojoFullOuterJoinTest() + { + PojoFullOuterJoin pij = new PojoFullOuterJoin<>(2, TestOutClass.class, "uId", "uId"); + + List>> accu = pij.defaultAccumulatedValue(); + + Assert.assertEquals(2, accu.size()); + + accu = pij.accumulate(accu, new TestPojo1(1, "Josh")); + accu = pij.accumulate(accu, new TestPojo1(2, "Bob")); + + accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12)); + accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13)); + + Assert.assertEquals(3, pij.getOutput(accu).size()); + + Object o = pij.getOutput(accu).get(1); + Assert.assertTrue(o instanceof TestOutClass); + TestOutClass testOutClass = (TestOutClass)o; + Assert.assertEquals(2, testOutClass.getUId()); + Assert.assertEquals("Bob", testOutClass.getUName()); + Assert.assertEquals(0, testOutClass.getAge()); + } +}