forked from MorganBauer/COP-6726-Databases-Project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BigQ.h
138 lines (126 loc) · 3.69 KB
/
BigQ.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#ifndef BIGQ_H
#define BIGQ_H
#include <pthread.h>
#include <iostream>
#include <vector>
#include <utility>
#include "Pipe.h"
#include "File.h"
#include "Record.h"
#include <functional>
#include <cstdlib>
using namespace std;
class Run
{
private:
int runID;
off_t start_offset;
off_t end_offset;
off_t cur_offset;
Page p;
Record r;
bool empty;
File * partiallySortedFile;
public:
Run(int rID, off_t start, off_t end, File * _partiallySortedFile) :
runID(rID), start_offset(start), end_offset(end), cur_offset(start), p(), r(), empty(false), partiallySortedFile(_partiallySortedFile)
{ partiallySortedFile->GetPage(&p,start_offset); }
Run(const Run & rn) :
runID(rn.runID), start_offset(rn.start_offset), end_offset(rn.end_offset), cur_offset(rn.cur_offset), p(), r(), empty(rn.empty), partiallySortedFile(rn.partiallySortedFile)
{
partiallySortedFile->AddPage(const_cast<Page *>(&rn.p),cur_offset); // violating the const promise so hard right now
partiallySortedFile->GetPage(&p,cur_offset);
}
Run & operator= (const Run & rn)
{
runID = rn.runID;
start_offset = rn.start_offset;
end_offset = rn.end_offset;
cur_offset = rn.cur_offset;
empty = rn.empty;
rn.partiallySortedFile->AddPage(const_cast<Page *>(&rn.p),cur_offset); // violating the const promise so hard right now
partiallySortedFile = rn.partiallySortedFile;
partiallySortedFile->GetPage(&p,cur_offset);
return * this;
}
~Run () {}
// returns true if a page was returned.
bool getNextRecord(Record & ret)
{
// cout << "R.GNR of " << runID << endl;
if( 1 == p.GetFirst(&ret) )
{ return true; }
else
{ // page was empty, get new page
cur_offset++;
// check if out of bounds
if (cur_offset < end_offset)
{
partiallySortedFile->GetPage(&p,cur_offset);
if( 1 == p.GetFirst(&ret) )
{ // page had something in it, have a record now.
return true;
}
else{ // error
exit(-1);}
}
else{ return false;}
}
}
void print()
{std::cout << "Run " << runID << ", starting at " << start_offset << " and running to " << end_offset << endl;}
};
class TaggedRecord
{
private:
int run;
public:
Record r;
int getRun(){return run;}
TaggedRecord(const Record _r, int _run) : run(_run), r(_r)
{ }
};
class TaggedRecordCompare : public std::binary_function <TaggedRecord, TaggedRecord, bool>
{
private:
OrderMaker & so;
ComparisonEngine comp;
public:
TaggedRecordCompare(OrderMaker _so) :so(_so), comp() {}
bool operator()(const TaggedRecord & x, const TaggedRecord & y) const {
return (comp.Compare(&(x.r), &(y.r), &so) > 0); }
};
class BigQ {
class Compare : public std::binary_function <Record, Record, bool>
{
private:
OrderMaker & so;
ComparisonEngine comp;
public:
Compare(OrderMaker _so) :so(_so), comp() {}
bool operator()(const Record & x, const Record & y) const __attribute__ ((hot)) {
return (comp.Compare(&x, &y, &so) < 0); }
};
private:
Pipe & in;
Pipe & out;
OrderMaker sortorder;
int runlen;
int pagesInserted;
int totalRecords;
int runCount;
File partiallySortedFile;
std::vector <pair <off_t, off_t> > runLocations;
pthread_t worker_thread;
static void *thread_starter(void *context);
void * WorkerThread(void);
void PhaseOne(void);
void sortRuns(std::vector<Record> & runlenrecords);
int writeSortedRunToFile(std::vector<Record> & runlenrecords);
void PhaseTwoLinearScan(void);
void PhaseTwoPriorityQueue(void);
public:
BigQ (Pipe &in, Pipe &out, OrderMaker &sortorder, int runlen);
~BigQ ();
};
#endif