Skip to content

Commit

Permalink
add vertex data ctx (alibaba#4)
Browse files Browse the repository at this point in the history
* add vertex data ctx

* update

* app extends vertex data context

* revert

* bugfix

* fix

* fix

* add finalize method to save result

* disable asan

* add context_type method

* change type

* add get data method

* cleanup

* cleanup

* optimizations

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* expose ctx data type

* move set_fragment to Init

* remove ctx_data_t

* Revert "remove ctx_data_t"

This reverts commit 70280df.

* cleanup

* add dummy data_t

* add dummy data_t

* cleanup

* fix

* refactoring ctx

* fix

* fix

* refine

* refine

* refine

* refactor all

* fix

* fix

* fix

* fix

* make changes according the review

* change return type of context_type

* cleanup

Co-authored-by: guanyi.gl <[email protected]>
  • Loading branch information
pwrliang and guanyi.gl authored Oct 13, 2020
1 parent f721f02 commit 367ea5e
Show file tree
Hide file tree
Showing 35 changed files with 413 additions and 166 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/c-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
run: |
mkdir build
cd build
cmake .. -DCMAKE_BUILD_TYPE=Debug -DWITH_ASAN=ON
cmake .. -DCMAKE_BUILD_TYPE=Debug
make cpplint
make
- name: App Test
Expand Down
17 changes: 12 additions & 5 deletions examples/analytical_apps/bfs/bfs_auto_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ namespace grape {
* @tparam FRAG_T
*/
template <typename FRAG_T>
class BFSAutoContext : public ContextBase<FRAG_T> {
class BFSAutoContext : public VertexDataContext<FRAG_T, int64_t> {
public:
using oid_t = typename FRAG_T::oid_t;
using vid_t = typename FRAG_T::vid_t;

void Init(const FRAG_T& frag, AutoParallelMessageManager<FRAG_T>& messages,
oid_t src_id) {
source_id = src_id;
explicit BFSAutoContext(const FRAG_T& fragment)
: VertexDataContext<FRAG_T, int64_t>(fragment),
partial_result(this->data()) {}

void Init(AutoParallelMessageManager<FRAG_T>& messages,
oid_t src_id) {
auto &frag = this->fragment();
auto vertices = frag.Vertices();

source_id = src_id;
partial_result.Init(vertices, std::numeric_limits<int64_t>::max(),
[](int64_t* lhs, int64_t rhs) {
if (*lhs > rhs) {
Expand All @@ -51,8 +56,10 @@ class BFSAutoContext : public ContextBase<FRAG_T> {
MessageStrategy::kSyncOnOuterVertex);
}

void Output(const FRAG_T& frag, std::ostream& os) {
void Output(std::ostream& os) override {
auto &frag = this->fragment();
auto inner_vertices = frag.InnerVertices();

for (auto v : inner_vertices) {
os << frag.GetId(v) << " " << partial_result[v] << std::endl;
}
Expand Down
22 changes: 13 additions & 9 deletions examples/analytical_apps/bfs/bfs_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,22 @@ namespace grape {
* @tparam FRAG_T
*/
template <typename FRAG_T>
class BFSContext : public ContextBase<FRAG_T> {
class BFSContext : public VertexDataContext<FRAG_T, int64_t> {
public:
using depth_type = int64_t;
using oid_t = typename FRAG_T::oid_t;
using vid_t = typename FRAG_T::vid_t;

void Init(const FRAG_T& frag, ParallelMessageManager& messages,
oid_t src_id) {
source_id = src_id;
explicit BFSContext(const FRAG_T& fragment)
: VertexDataContext<FRAG_T, int64_t>(fragment, true),
partial_result(this->data()) {}

auto vertices = frag.Vertices();
partial_result.Init(vertices, std::numeric_limits<depth_type>::max());
void Init(ParallelMessageManager& messages,
oid_t src_id) {
auto &frag = this->fragment();

source_id = src_id;
partial_result.SetValue(std::numeric_limits<depth_type>::max());
avg_degree = static_cast<double>(frag.GetEdgeNum()) /
static_cast<double>(frag.GetInnerVerticesNum());

Expand All @@ -50,12 +53,13 @@ class BFSContext : public ContextBase<FRAG_T> {
#endif
}

void Output(const FRAG_T& frag, std::ostream& os) {
void Output(std::ostream& os) override {
auto &frag = this->fragment();
auto inner_vertices = frag.InnerVertices();

for (auto v : inner_vertices) {
os << frag.GetId(v) << " " << partial_result[v] << std::endl;
}

#ifdef PROFILING
VLOG(2) << "preprocess_time: " << preprocess_time << "s.";
VLOG(2) << "exec_time: " << exec_time << "s.";
Expand All @@ -64,7 +68,7 @@ class BFSContext : public ContextBase<FRAG_T> {
}

oid_t source_id;
typename FRAG_T::template vertex_array_t<depth_type> partial_result;
typename FRAG_T::template vertex_array_t<depth_type>& partial_result;
DenseVertexSet<vid_t> curr_inner_updated, next_inner_updated;

depth_type current_depth = 0;
Expand Down
17 changes: 12 additions & 5 deletions examples/analytical_apps/cdlp/cdlp_auto_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace grape {
* @tparam FRAG_T
*/
template <typename FRAG_T>
class CDLPAutoContext : public ContextBase<FRAG_T> {
class CDLPAutoContext
: public VertexDataContext<FRAG_T, typename FRAG_T::oid_t> {
public:
using oid_t = typename FRAG_T::oid_t;
using vid_t = typename FRAG_T::vid_t;
Expand All @@ -35,12 +36,16 @@ class CDLPAutoContext : public ContextBase<FRAG_T> {
using label_t = oid_t;
#endif

void Init(const FRAG_T& frag, AutoParallelMessageManager<FRAG_T>& messages,
int max_round) {
this->max_round = max_round;
explicit CDLPAutoContext(const FRAG_T& fragment)
: VertexDataContext<FRAG_T, typename FRAG_T::oid_t>(fragment, true),
labels(this->data()) {}

void Init(AutoParallelMessageManager<FRAG_T>& messages, int max_round) {
auto& frag = this->fragment();
auto vertices = frag.Vertices();
auto inner_vertices = frag.InnerVertices();

this->max_round = max_round;
labels.Init(vertices, 0, [](label_t* lhs, label_t rhs) {
*lhs = rhs;
return true;
Expand Down Expand Up @@ -72,8 +77,10 @@ class CDLPAutoContext : public ContextBase<FRAG_T> {
step = 0;
}

void Output(const FRAG_T& frag, std::ostream& os) {
void Output(std::ostream& os) override {
auto& frag = this->fragment();
auto inner_vertices = frag.InnerVertices();

for (auto v : inner_vertices) {
os << frag.GetId(v) << " " << labels[v] << std::endl;
}
Expand Down
19 changes: 12 additions & 7 deletions examples/analytical_apps/cdlp/cdlp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace grape {
* @tparam FRAG_T
*/
template <typename FRAG_T>
class CDLPContext : public ContextBase<FRAG_T> {
class CDLPContext : public VertexDataContext<FRAG_T, typename FRAG_T::oid_t> {
public:
using oid_t = typename FRAG_T::oid_t;
using vid_t = typename FRAG_T::vid_t;
Expand All @@ -37,13 +37,16 @@ class CDLPContext : public ContextBase<FRAG_T> {
#else
using label_t = oid_t;
#endif
void Init(const FRAG_T& frag, ParallelMessageManager& messages,
explicit CDLPContext(const FRAG_T& fragment)
: VertexDataContext<FRAG_T, typename FRAG_T::oid_t>(fragment, true),
labels(this->data()) {}

void Init(ParallelMessageManager& messages,
int max_round) {
this->max_round = max_round;
auto &frag = this->fragment();
auto inner_vertices = frag.InnerVertices();
auto vertices = frag.Vertices();

labels.Init(vertices);
this->max_round = max_round;
changed.Init(inner_vertices);

#ifdef PROFILING
Expand All @@ -54,14 +57,16 @@ class CDLPContext : public ContextBase<FRAG_T> {
step = 0;
}

void Output(const FRAG_T& frag, std::ostream& os) {
void Output(std::ostream& os) override {
auto &frag = this->fragment();
auto inner_vertices = frag.InnerVertices();

for (auto v : inner_vertices) {
os << frag.GetId(v) << " " << labels[v] << std::endl;
}
}

typename FRAG_T::template vertex_array_t<label_t> labels;
typename FRAG_T::template vertex_array_t<label_t>& labels;
typename FRAG_T::template vertex_array_t<bool> changed;

#ifdef PROFILING
Expand Down
16 changes: 14 additions & 2 deletions examples/analytical_apps/lcc/lcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,20 @@ class LCC : public ParallelAppBase<FRAG_T, LCCContext<FRAG_T>>,
ctx.preprocess_time += GetCurrentTime();
#endif
} else {
messages.ParallelProcess<fragment_t, int>(
thread_num(), frag, [](int tid, vertex_t u, int) {});
auto& global_degree = ctx.global_degree;
auto& tricnt = ctx.tricnt;
auto& ctx_data = ctx.data();

for (auto v : inner_vertices) {
if (global_degree[v] == 0 || global_degree[v] == 1) {
ctx_data[v] = 0;
} else {
double re = 2.0 * (tricnt[v]) /
(static_cast<int64_t>(global_degree[v]) *
(static_cast<int64_t>(global_degree[v]) - 1));
ctx_data[v] = re;
}
}
}
}
};
Expand Down
14 changes: 14 additions & 0 deletions examples/analytical_apps/lcc/lcc_auto.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ class LCCAuto : public AutoAppBase<FRAG_T, LCCAutoContext<FRAG_T>> {
}
} else if (ctx.stage == 2) {
ctx.stage = 3;
auto& global_degree = ctx.global_degree;
auto& tricnt = ctx.tricnt;
auto& ctx_data = ctx.data();

for (auto v : inner_vertices) {
if (global_degree[v] == 0 || global_degree[v] == 1) {
ctx_data[v] = 0;
} else {
double re = 2.0 * (tricnt[v]) /
(static_cast<int64_t>(global_degree[v]) *
(static_cast<int64_t>(global_degree[v]) - 1));
ctx_data[v] = re;
}
}
}
}
};
Expand Down
16 changes: 11 additions & 5 deletions examples/analytical_apps/lcc/lcc_auto_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ namespace grape {
* @tparam FRAG_T
*/
template <typename FRAG_T>
class LCCAutoContext : public ContextBase<FRAG_T> {
class LCCAutoContext : public VertexDataContext<FRAG_T, double> {
public:
using oid_t = typename FRAG_T::oid_t;
using vid_t = typename FRAG_T::vid_t;

void Init(const FRAG_T& frag, AutoParallelMessageManager<FRAG_T>& messages) {
explicit LCCAutoContext(const FRAG_T& fragment)
: VertexDataContext<FRAG_T, double>(fragment) {}

void Init(AutoParallelMessageManager<FRAG_T>& messages) {
auto &frag = this->fragment();
auto vertices = frag.Vertices();

global_degree.Init(vertices, 0, [](int* lhs, int rhs) {
*lhs = rhs;
return true;
Expand All @@ -59,16 +64,17 @@ class LCCAutoContext : public ContextBase<FRAG_T> {
MessageStrategy::kSyncOnOuterVertex);
}

void Output(const FRAG_T& frag, std::ostream& os) {
void Output(std::ostream& os) override {
auto& frag = this->fragment();
auto inner_vertices = frag.InnerVertices();
for (auto v : inner_vertices) {
if (global_degree[v] == 0 || global_degree[v] == 1) {
os << frag.GetId(v) << " " << std::scientific << std::setprecision(15)
<< 0.0 << std::endl;
} else {
double re = 2.0 * (tricnt[v]) /
(static_cast<int64_t>(global_degree[v]) *
(static_cast<int64_t>(global_degree[v]) - 1));
(static_cast<int64_t>(global_degree[v]) *
(static_cast<int64_t>(global_degree[v]) - 1));
os << frag.GetId(v) << " " << std::scientific << std::setprecision(15)
<< re << std::endl;
}
Expand Down
16 changes: 11 additions & 5 deletions examples/analytical_apps/lcc/lcc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,35 @@ namespace grape {
* @tparam FRAG_T
*/
template <typename FRAG_T>
class LCCContext : public ContextBase<FRAG_T> {
class LCCContext : public VertexDataContext<FRAG_T, double> {
public:
using oid_t = typename FRAG_T::oid_t;
using vid_t = typename FRAG_T::vid_t;
using vertex_t = typename FRAG_T::vertex_t;

void Init(const FRAG_T& frag, ParallelMessageManager& messages) {
explicit LCCContext(const FRAG_T& fragment)
: VertexDataContext<FRAG_T, double>(fragment) {}

void Init(ParallelMessageManager& messages) {
auto &frag = this->fragment();
auto vertices = frag.Vertices();

global_degree.Init(vertices);
complete_neighbor.Init(vertices);
tricnt.Init(vertices, 0);
}

void Output(const FRAG_T& frag, std::ostream& os) {
void Output(std::ostream& os) override {
auto& frag = this->fragment();
auto inner_vertices = frag.InnerVertices();
for (auto v : inner_vertices) {
if (global_degree[v] == 0 || global_degree[v] == 1) {
os << frag.GetId(v) << " " << std::scientific << std::setprecision(15)
<< 0.0 << std::endl;
} else {
double re = 2.0 * (tricnt[v]) /
(static_cast<int64_t>(global_degree[v]) *
(static_cast<int64_t>(global_degree[v]) - 1));
(static_cast<int64_t>(global_degree[v]) *
(static_cast<int64_t>(global_degree[v]) - 1));
os << frag.GetId(v) << " " << std::scientific << std::setprecision(15)
<< re << std::endl;
}
Expand Down
16 changes: 13 additions & 3 deletions examples/analytical_apps/pagerank/pagerank.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,22 @@ class PageRank : public BatchShuffleAppBase<FRAG_T, PageRankContext<FRAG_T>>,
#ifdef PROFILING
ctx.postprocess_time -= GetCurrentTime();
#endif
ctx.result.Swap(ctx.next_result);

if (ctx.step != ctx.max_round) {
messages.SyncInnerVertices<fragment_t, double>(frag, ctx.next_result,
messages.SyncInnerVertices<fragment_t, double>(frag, ctx.result,
thread_num());
}
} else {
auto& degree = ctx.degree;
auto& result = ctx.result;

ctx.result.Swap(ctx.next_result);
for (auto v : inner_vertices) {
if (degree[v] != 0) {
result[v] *= degree[v];
}
}
return;
}
#ifdef PROFILING
ctx.postprocess_time += GetCurrentTime();
#endif
Expand Down
8 changes: 8 additions & 0 deletions examples/analytical_apps/pagerank/pagerank_auto.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ class PageRankAuto : public AutoAppBase<FRAG_T, PageRankAutoContext<FRAG_T>>,

++ctx.step;
if (ctx.step > ctx.max_round) {
auto& degree = ctx.degree;
auto& results = ctx.results;

for (auto v : inner_vertices) {
if (degree[v] != 0) {
results[v] *= degree[v];
}
}
return;
}

Expand Down
Loading

0 comments on commit 367ea5e

Please sign in to comment.