-
Notifications
You must be signed in to change notification settings - Fork 0
/
splitfilereader.go
55 lines (47 loc) · 1.34 KB
/
splitfilereader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main
import (
"fmt"
"log/slog"
"os"
)
type SplitFileReaders struct {
fileName string
nbReaders int
fileSize int64
}
func NewSplitFileReader(fileName string, nbReaders int) SplitFileReaders {
return SplitFileReaders{
fileName: fileName,
nbReaders: nbReaders,
fileSize: fileSize(fileName)}
}
func (sfr *SplitFileReaders) processFileConcurrently() (chan map[uint32]*CityTemperatures, error) {
mapsChan := make(chan map[uint32]*CityTemperatures)
slog.Info(fmt.Sprintf("Start %v readers for file %v", sfr.nbReaders, sfr.fileName))
sfr.startReaders(mapsChan)
return mapsChan, nil
}
func fileSize(fileName string) int64 {
stat, err := os.Stat(fileName)
if err != nil {
slog.Error(err.Error())
panic(err)
}
return stat.Size()
}
func (sfr *SplitFileReaders) defaultChunkSize() int64 {
return sfr.fileSize/int64(sfr.nbReaders) + 1
}
func (sfr *SplitFileReaders) chunkSizeForReader(readerNb int) int64 {
if readerNb == sfr.nbReaders-1 {
return sfr.fileSize - (int64(sfr.nbReaders)-1)*sfr.defaultChunkSize()
} else {
return sfr.defaultChunkSize()
}
}
func (sfr *SplitFileReaders) startReaders(chunkResultChan chan map[uint32]*CityTemperatures) {
for i := 0; i < sfr.nbReaders; i++ {
r := NewChunkReader(sfr.fileName, uint8(i), int64(i)*sfr.defaultChunkSize(), sfr.chunkSizeForReader(i), chunkResultChan)
go r.startReader()
}
}