-
Notifications
You must be signed in to change notification settings - Fork 0
Channel
Channel is one of the most important concepts in Husky. It defines how the Object Lists communicate with each other.
If you consider each Object List as a vertex in a workflow graph, then Channels may serve as directed edges between the vertexes. It defines how the messages flow from one end to the other.
Currently, 4 kinds of Channels are available in Husky.
-
Push Channel
-
Push Combined Channel
-
Broadcast Channel
-
Migrate Channel
All Channels should be created through the ChannelFactory
. The life cycle of the channels is beyond the scope, so if you want to really destroy a channel, you need to use drop_channel
function.
The APIs for ChannelFactory are listed (The ChannelFactory namespace is omitted):
// create PushChannel
template <typename MsgT, typename DstObjT>
static PushChannel<MsgT, DstObjT>&
create_push_channel(ChannelSource& src_list,
ObjList<DstObjT>& dst_list,
const std::string& name = "");
// Create PushCombinedChannel
template <typename MsgT, typename CombineT, typename DstObjT>
static PushCombinedChannel<MsgT, DstObjT, CombineT>&
create_push_combined_channel(ChannelSource& src_list,
ObjList<DstObjT>& dst_list,
const std::string& name = "");
// Create MigrateChannel
template <typename ObjT>
static MigrateChannel<ObjT>&
create_migrate_channel(ObjList<ObjT>& src_list,
ObjList<ObjT>& dst_list,
const std::string& name = "");
// Create BroadcastChannel
template <typename KeyT, typename MsgT>
static BroadcastChannel<KeyT, MsgT>&
create_broadcast_channel(ChannelSource& src_list,
const std::string& name = "");
// Get PushChannel through name
template <typename MsgT, typename DstObjT>
static PushChannel<MsgT, DstObjT>&
get_push_channel(const std::string& name = "");
// Get PushCombinedChannel through name
template <typename MsgT, typename CombineT, typename DstObjT>
static PushCombinedChannel<MsgT, DstObjT, CombineT>&
get_push_combined_channel(const std::string& name = "");
// Get MigrateChannel through name
template <typename ObjT>
static MigrateChannel<ObjT>&
get_migrate_channel(const std::string& name = "");
// Get BroadcastChannel through name
template <typename KeyT, typename MsgT>
static BroadcastChannel<KeyT, MsgT>&
get_broadcast_channel(const std::string& name = "");
// Drop channel through name
static void drop_channel(const std::string& name);
As you can see, we can create channel without given a name. In this case, ChannelFactory will automatically generate a name for you channel. Since you don't know the name of your channel, you can not get the channel or drop the channel through name. To make it simpler, the name for channels are not provided in the examples below.
PushChannel is arguably the most frequently used Channel. You can use PushChannel to send messages from one ObjList to another Objlist. Since the destination ObjList needs to receive the messages from different machines, it need to be globally indexed. So the destination ObjList must be globalized.
To create a PushChannel, you need to specify the message type you want to pass through the Channel as well as the source ObjList and destination ObjList. You can create PushChannel of integers from src_list to dst_list like this:
auto& ch = ChannelFactory::create_push_channel<int>(src_list, dst_list);
Then, when you list_execute
over the src_list, you can put
the messages to the channel. You need to decide which Object in the destination ObjList should get your messages.
list_execute(src_list, [&ch](Obj& obj) {
ch.push(msg, key); // send msg to key
});
After the list_execute
over the src_list, you can list_execute
the dst_list, all the messages you passed through the PushChannel are now available for the dst_list.
list_execute(dst_list, [&ch](Obj& obj) {
auto& msgs = ch.get(obj); // The msgs is of type std::vector<MsgT>, MsgT is int in this case
});
It is also worth noting that the source of the channel can also be InputFormat.
HDFSLineInputFormat infmt;
infmt.set_input(Context::get_param("input"));
ObjList<Word> word_list;
auto& ch = ChannelFactory::create_push_combined_channel<int, SumCombiner<int>>(infmt, word_list);
Then, you can just use PushChannel's put
function in load
function.
PushCombinedChannel is almost the same as the PushChannel, except that In PushCombinedChannel, you need to provide another type to define how you want to combine the messages sent to the same key.
auto& ch2 = ChannelFactory::create_push_combined_channel<int, SumCombiner<int>>(src_list, dst_list);
list_execute(src_list, [&ch2](Obj& obj) {
ch2.push(msg, key); // send msg to key
});
list_execute(dst_list, [&ch2](Obj& obj) {
auto combined_msg = ch2.get(obj);
});
You notice that another type is provided. The type is SumCombiner<int>
. This type define using the sum operator to combine the messages sent to the same key.
Objects in Objlist are free to move. You can use MigrateChannel to migrate a specific Objects to a given thread.
Object type should be given when creating a MigrateChannel, as well as source ObjList and destination ObjList.
auto& ch3 = ChannelFactory::create_migrate_channel<Obj>(src_list, dst_list);
list_execute(src_list, [&ch3](Obj& obj) {
if (obj.id() == 0)
ch3.migrate(obj, 0); // Migrate to thread 0
});
list_execute(dst_list, [&ch3](Obj& obj) {
// You will have the migrated Object here
});
migrate
function is used to migrate an object to a given thread_id.
You can use BroadcastChannel to broadcast key/value pairs. After the broadcasting, the key/value pairs will be globally available.
So in BroadcastChannel, no destination ObjList needs to be provided. Only the source ObjList is enough. You also need to specify the key type and the value type of the BroadcastChannel. In the example below, we create a BroadcastChannel with int as key type, std::string as value type. The source ObjList is src_list.
auto& ch4 = ChannelFactory::create_broadcast_channel<int, std::string>(src_list);
list_execute(src_list, [&ch4](Obj& obj) {
ch4.broadcast(key, value); // broadcast key, value pair
});
list_execute(src_list, [&ch4](Obj& obj) {
auto msg = ch4.get(key); // get the broadcasted value through key.
});
The channels' APIs that users will normally need to use are listed below:
// PushChannel
void push(const MsgT& msg, const typename DstObjT::KeyT& key);
const std::vector<MsgT>& get(const DstObjT& obj) const;
// PushCombinedChannel
void push(const MsgT& msg, const typename DstObjT::KeyT& key);
const MsgT& get(const DstObjT& obj) const;
// MigrateChannel
void migrate(ObjT& obj, int dst_thread_id);
// BroadCastChannel
void broadcast(const KeyT& key, const ValueT& value);
ValueT& get(const KeyT& key);