checkpoint prior to streaming
This commit is contained in:
commit
737e8990c9
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
/measurements.txt
|
||||||
|
/results.txt
|
||||||
8
.idea/.gitignore
vendored
Normal file
8
.idea/.gitignore
vendored
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
# Default ignored files
|
||||||
|
/shelf/
|
||||||
|
/workspace.xml
|
||||||
|
# Editor-based HTTP Client requests
|
||||||
|
/httpRequests/
|
||||||
|
# Datasource local storage ignored files
|
||||||
|
/dataSources/
|
||||||
|
/dataSources.local.xml
|
||||||
6
.idea/misc.xml
Normal file
6
.idea/misc.xml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="ProjectRootManager">
|
||||||
|
<output url="file://$PROJECT_DIR$/out" />
|
||||||
|
</component>
|
||||||
|
</project>
|
||||||
8
.idea/modules.xml
Normal file
8
.idea/modules.xml
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="ProjectModuleManager">
|
||||||
|
<modules>
|
||||||
|
<module fileurl="file://$PROJECT_DIR$/1brc-go.iml" filepath="$PROJECT_DIR$/1brc-go.iml" />
|
||||||
|
</modules>
|
||||||
|
</component>
|
||||||
|
</project>
|
||||||
6
.idea/vcs.xml
Normal file
6
.idea/vcs.xml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="VcsDirectoryMappings">
|
||||||
|
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||||
|
</component>
|
||||||
|
</project>
|
||||||
9
1brc-go.iml
Normal file
9
1brc-go.iml
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<module type="WEB_MODULE" version="4">
|
||||||
|
<component name="Go" enabled="true" />
|
||||||
|
<component name="NewModuleRootManager" inherit-compiler-output="true">
|
||||||
|
<exclude-output />
|
||||||
|
<content url="file://$MODULE_DIR$" />
|
||||||
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
|
</component>
|
||||||
|
</module>
|
||||||
364
1brc.go
Normal file
364
1brc.go
Normal file
@ -0,0 +1,364 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
"runtime/pprof"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
//todo:
|
||||||
|
// bytes.split is doing a ton of allocation. I think we can do better by only returning references instead of doing
|
||||||
|
// a copy
|
||||||
|
|
||||||
|
const measurementsFile = "measurements.txt"
|
||||||
|
const testFile = "dummy.txt"
|
||||||
|
const resultFile = "my_results.txt"
|
||||||
|
const benchFile = "results.txt"
|
||||||
|
|
||||||
|
const profile = true
|
||||||
|
|
||||||
|
var nGoRoutine = 2
|
||||||
|
|
||||||
|
const maxSeekLen = int64(100)
|
||||||
|
|
||||||
|
type partitionRange struct {
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type resultSet map[uint64]result
|
||||||
|
|
||||||
|
type result struct {
|
||||||
|
name []byte
|
||||||
|
count float64
|
||||||
|
min float64
|
||||||
|
max float64
|
||||||
|
rAvg float64
|
||||||
|
}
|
||||||
|
|
||||||
|
var count = int64(0)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
fmt.Printf("Starting with %d threads\n", nGoRoutine)
|
||||||
|
var f *os.File
|
||||||
|
if profile {
|
||||||
|
f = startCpuProfile()
|
||||||
|
}
|
||||||
|
_ = os.Remove(resultFile)
|
||||||
|
start := time.Now()
|
||||||
|
oneBRC()
|
||||||
|
finish := time.Now()
|
||||||
|
if profile {
|
||||||
|
stopProfiling(f)
|
||||||
|
}
|
||||||
|
fmt.Printf("Total ellapsed time: %.2fs\n", finish.Sub(start).Seconds())
|
||||||
|
fmt.Printf("Total computed records: %d\n", count)
|
||||||
|
validate()
|
||||||
|
}
|
||||||
|
|
||||||
|
func validate() {
|
||||||
|
me, err := os.ReadFile(resultFile)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ref, err := os.ReadFile(benchFile)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
meLines := bytes.Split(me, []byte{'\r', '\n'})
|
||||||
|
refLines := bytes.Split(ref, []byte{'\r', '\n'})
|
||||||
|
|
||||||
|
for i, line := range meLines {
|
||||||
|
refLine := string(refLines[i])
|
||||||
|
meLine := string(line)
|
||||||
|
if refLine != meLine {
|
||||||
|
fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func oneBRC() {
|
||||||
|
bits, err := os.ReadFile(measurementsFile)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := createPartitions(bits, '\n', nGoRoutine)
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
results := make([]resultSet, len(parts))
|
||||||
|
|
||||||
|
for i, part := range parts {
|
||||||
|
wg.Add(1)
|
||||||
|
go workerComputePartition(bits, part, i, results, wg)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
finalRS := make(resultSet)
|
||||||
|
|
||||||
|
for _, rs := range results {
|
||||||
|
finalRS.merge(rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
os.WriteFile(resultFile, []byte(finalRS.String()), 0666)
|
||||||
|
}
|
||||||
|
|
||||||
|
func startCpuProfile() *os.File {
|
||||||
|
name := fmt.Sprintf("prof-%s.pprof", time.Now().Format("15-04-05"))
|
||||||
|
f, err := os.Create(name)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
err = pprof.StartCPUProfile(f)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
||||||
|
func stopProfiling(f *os.File) {
|
||||||
|
pprof.StopCPUProfile()
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int, container []resultSet, wg *sync.WaitGroup) {
|
||||||
|
defer wg.Done()
|
||||||
|
const delimiter = byte(';')
|
||||||
|
|
||||||
|
rs := make(resultSet)
|
||||||
|
|
||||||
|
alloc := make([]result, 500)
|
||||||
|
aCnt := 0
|
||||||
|
var di int
|
||||||
|
var line []byte
|
||||||
|
|
||||||
|
hasher := fnv.New64()
|
||||||
|
|
||||||
|
var start = wPart.start
|
||||||
|
var end int64
|
||||||
|
for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) {
|
||||||
|
|
||||||
|
line = aData[start : end-2]
|
||||||
|
//we know there will be at minimum 0.0, so we can skip 3 bytes
|
||||||
|
for di = len(line) - 3; di > -1; di-- {
|
||||||
|
if line[di] == delimiter {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if di < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hasher.Write(line[:di])
|
||||||
|
key := hasher.Sum64()
|
||||||
|
hasher.Reset()
|
||||||
|
r, ok := rs[key]
|
||||||
|
if !ok {
|
||||||
|
r = alloc[aCnt]
|
||||||
|
r.name = line[:di]
|
||||||
|
r.min = math.MaxFloat64
|
||||||
|
r.max = math.SmallestNonzeroFloat64
|
||||||
|
aCnt++
|
||||||
|
}
|
||||||
|
r.count += 1.0
|
||||||
|
|
||||||
|
r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count
|
||||||
|
|
||||||
|
if temp > r.max {
|
||||||
|
r.max = temp
|
||||||
|
}
|
||||||
|
|
||||||
|
if temp < r.min {
|
||||||
|
r.min = temp
|
||||||
|
}
|
||||||
|
|
||||||
|
rs[key] = r
|
||||||
|
start = end
|
||||||
|
if end == wPart.end {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
container[workerNumber] = rs
|
||||||
|
}
|
||||||
|
|
||||||
|
// createNewLineIndex
|
||||||
|
// we know that if we *just* returned a line, we can skip a few characters for known values
|
||||||
|
// each line has:
|
||||||
|
// - 5 x.x\r\n
|
||||||
|
// - 1 ;
|
||||||
|
// - 2 [a-z] first 2 characters of location
|
||||||
|
//
|
||||||
|
// for a total of 8
|
||||||
|
func acreateNewLineIndex(b []byte, part partitionRange) []int64 {
|
||||||
|
idx := make([]int64, 0, 16*1024*1024)
|
||||||
|
const step = 8
|
||||||
|
|
||||||
|
for i := part.start; i < part.end; i++ {
|
||||||
|
if b[i] == '\n' {
|
||||||
|
idx = append(idx, i+1)
|
||||||
|
i += step
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return idx
|
||||||
|
}
|
||||||
|
|
||||||
|
func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {
|
||||||
|
const step = 8
|
||||||
|
for i := last + step; i < part.end; {
|
||||||
|
switch b[i] {
|
||||||
|
case '\n':
|
||||||
|
return i + 1
|
||||||
|
case '\r':
|
||||||
|
return i + 2
|
||||||
|
case ';': // this will be minimum [;0.0\r]
|
||||||
|
i += 5
|
||||||
|
default:
|
||||||
|
i += 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return part.end
|
||||||
|
}
|
||||||
|
|
||||||
|
func createPartitions(data []byte, seekChar byte, nPart int) []partitionRange {
|
||||||
|
if len(data) == 0 || nPart == 0 {
|
||||||
|
return make([]partitionRange, nPart)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nPart == 0 {
|
||||||
|
return []partitionRange{}
|
||||||
|
}
|
||||||
|
|
||||||
|
si := int64(0)
|
||||||
|
step := int64(len(data) / nPart)
|
||||||
|
tLen := int64(len(data))
|
||||||
|
|
||||||
|
partitions := make([]partitionRange, nPart)
|
||||||
|
|
||||||
|
for i := 0; i < nPart; i++ {
|
||||||
|
start := si
|
||||||
|
|
||||||
|
end := si + step
|
||||||
|
|
||||||
|
if start > tLen {
|
||||||
|
return partitions[:len(partitions)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
if end > tLen {
|
||||||
|
partitions[i] = partitionRange{start: start, end: tLen}
|
||||||
|
return partitions
|
||||||
|
}
|
||||||
|
|
||||||
|
found := false
|
||||||
|
seekForward := end + maxSeekLen
|
||||||
|
//seek next new line
|
||||||
|
for j := end; j < min(seekForward, tLen); j++ {
|
||||||
|
if data[j] == seekChar {
|
||||||
|
if end == tLen {
|
||||||
|
//if the final character is also a split character we just return the rest of the chunk
|
||||||
|
end = tLen
|
||||||
|
} else {
|
||||||
|
end = j + 1
|
||||||
|
}
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
//we'll start looking backward from the end point by the seek amount just in case we barely over-stepped
|
||||||
|
seekBack := max(0, end-maxSeekLen)
|
||||||
|
for j := end; j > seekBack; j-- {
|
||||||
|
if data[j] == seekChar {
|
||||||
|
end = j + 1
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
fmt.Printf("Could not find split in range [%d, %d]\n%s", seekBack, seekForward, string(data)[seekBack:seekForward])
|
||||||
|
panic("thread died")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
partitions[i] = partitionRange{start: start, end: end}
|
||||||
|
si = end
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return partitions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *result) merge(other *result) {
|
||||||
|
if other.max > r.max {
|
||||||
|
r.max = other.max
|
||||||
|
}
|
||||||
|
|
||||||
|
if other.min < r.min {
|
||||||
|
r.min = other.min
|
||||||
|
}
|
||||||
|
|
||||||
|
r.rAvg = ((r.rAvg * r.count) + (other.rAvg * other.count)) / (r.count + other.count)
|
||||||
|
r.count += other.count
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs resultSet) merge(other resultSet) {
|
||||||
|
for k, v := range other {
|
||||||
|
tr, ok := rs[k]
|
||||||
|
if !ok {
|
||||||
|
rs[k] = v
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tr.merge(&v)
|
||||||
|
|
||||||
|
rs[k] = tr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs resultSet) String() string {
|
||||||
|
keys := make([]string, 0, len(rs))
|
||||||
|
|
||||||
|
for _, v := range rs {
|
||||||
|
keys = append(keys, string(v.name))
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(keys)
|
||||||
|
b := &strings.Builder{}
|
||||||
|
hasher := fnv.New64()
|
||||||
|
for i, key := range keys {
|
||||||
|
hasher.Write([]byte(key))
|
||||||
|
r := rs[hasher.Sum64()]
|
||||||
|
hasher.Reset()
|
||||||
|
count += int64(r.count)
|
||||||
|
b.WriteString(key)
|
||||||
|
b.WriteString("=")
|
||||||
|
b.WriteString(fmt.Sprintf("%.1f", r.min))
|
||||||
|
b.WriteString("/")
|
||||||
|
b.WriteString(fmt.Sprintf("%.1f", r.rAvg))
|
||||||
|
b.WriteString("/")
|
||||||
|
b.WriteString(fmt.Sprintf("%.1f", r.max))
|
||||||
|
if i < len(keys)-1 {
|
||||||
|
b.WriteString(",\r\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
66
1brc_test.go
Normal file
66
1brc_test.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCreatePartitions(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
data []byte
|
||||||
|
seekChar byte
|
||||||
|
nPart int
|
||||||
|
expected []partitionRange
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"NormalCase",
|
||||||
|
[]byte("xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1"),
|
||||||
|
'1',
|
||||||
|
3,
|
||||||
|
[]partitionRange{{0, 15}, {15, 30}, {30, 45}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"NoCharFound",
|
||||||
|
[]byte("xxx1xxx1xxx1xxx1xxx1xxx1xxx1xxx1xxx1"),
|
||||||
|
'z',
|
||||||
|
3,
|
||||||
|
[]partitionRange{{0, 12}, {12, 24}, {24, 36}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"SingleCharData",
|
||||||
|
[]byte{'g'},
|
||||||
|
'g',
|
||||||
|
1,
|
||||||
|
[]partitionRange{{0, 1}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"EmptyData",
|
||||||
|
[]byte{},
|
||||||
|
'g',
|
||||||
|
3,
|
||||||
|
[]partitionRange{{0, 0}, {0, 0}, {0, 0}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"NoPartitions",
|
||||||
|
[]byte("abcdefghijk"),
|
||||||
|
'g',
|
||||||
|
0,
|
||||||
|
[]partitionRange{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
result := createPartitions(tt.data, tt.seekChar, tt.nPart)
|
||||||
|
if len(result) != len(tt.expected) {
|
||||||
|
t.Errorf("Got %v, expected %v", result, tt.expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, val := range result {
|
||||||
|
if val != tt.expected[i] {
|
||||||
|
t.Errorf("At %v: got %v, expected %v", i, val, tt.expected[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user