1b4c/1brc.go
dtookey b0bf9100a6 Starting with 128 threads
Total ellapsed time: 4.99s
Total computed records: 1000000000
Validation passed: true
2024-08-02 13:27:36 -04:00

463 lines
8.6 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"hash/fnv"
"log"
"math"
"os"
"runtime/pprof"
"slices"
"sort"
"strings"
"sync"
"time"
)
const measurementsFile = "measurements.txt"
const testFile = "dummy.txt"
const resultFile = "my_results.txt"
const benchFile = "results.txt"
const profile = false
var nGoRoutine = 128
const maxSeekLen = int64(100)
type partitionRange struct {
start int64
end int64
}
type resultSet map[uint64]result
type fastResultSet struct {
v []result
keys []uint64
size int
}
// this is actually awful. we have ~412 unique keys for our specific data set, and we require f**kin' 100x space because
// our fast hash collides like it was designed to, but this one works for
func newFRS() *fastResultSet {
const size = 50000
return &fastResultSet{v: make([]result, size), keys: make([]uint64, size)}
}
type result struct {
name []byte
count float64
min float64
max float64
sum float64
//rAvg float64
}
var count = int64(0)
func main() {
fmt.Printf("Starting with %d threads\n", nGoRoutine)
var f *os.File
if profile {
f = startCpuProfile()
}
_ = os.Remove(testFile)
start := time.Now()
oneBRC()
finish := time.Now()
if profile {
stopProfiling(f)
}
fmt.Printf("Total ellapsed time: %.2fs\n", finish.Sub(start).Seconds())
fmt.Printf("Total computed records: %d\n", count)
validate()
}
func oneBRC() {
bits, err := os.ReadFile(measurementsFile)
if err != nil {
panic(err)
}
parts := createPartitions(bits, '\n', nGoRoutine)
wg := &sync.WaitGroup{}
results := make([]*fastResultSet, len(parts))
for i, part := range parts {
wg.Add(1)
go workerComputePartition(bits, part, i, results, wg)
}
wg.Wait()
finalRS := make(resultSet)
for _, rs := range results {
finalRS.merge(rs.toResultSet())
}
os.WriteFile(resultFile, []byte(finalRS.String()), 0666)
}
func validate() {
me, err := os.ReadFile(resultFile)
if err != nil {
panic(err)
}
ref, err := os.ReadFile(benchFile)
if err != nil {
panic(err)
}
meLines := bytes.Split(me, []byte{'\r', '\n'})
refLines := bytes.Split(ref, []byte{'\r', '\n'})
errs := 0
for i, line := range meLines {
refLine := string(refLines[i])
meLine := string(line)
if refLine != meLine {
errs++
fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine)
}
}
fmt.Printf("Validation passed: %v\n", errs == 0)
}
func startCpuProfile() *os.File {
name := fmt.Sprintf("prof-%s.pprof", time.Now().Format("15-04-05"))
f, err := os.Create(name)
if err != nil {
log.Fatal(err)
}
err = pprof.StartCPUProfile(f)
if err != nil {
log.Fatal(err)
}
return f
}
func stopProfiling(f *os.File) {
pprof.StopCPUProfile()
f.Close()
}
func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int, container []*fastResultSet, wg *sync.WaitGroup) {
defer wg.Done()
const delimiter = byte(';')
rs := newFRS()
alloc := make([]result, 500)
aCnt := 0
var di int
var line []byte
hasher := fnv.New64()
var start = wPart.start
var end int64
for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) {
line = aData[start : end-2]
//we know there will be at minimum 0.0, so we can skip 3 bytes
for di = len(line) - 3; di > -1; di-- {
if line[di] == delimiter {
break
}
}
if di < 0 {
continue
}
temp := fParseFloat(line[di+1:][:len(line[di+1:])])
hasher.Write(line[:di])
key := hasher.Sum64()
hasher.Reset()
r, ok := rs.get(key)
if !ok {
r = alloc[aCnt]
r.name = line[:di]
r.min = math.MaxFloat64
r.max = math.SmallestNonzeroFloat64
aCnt++
}
r.count += 1.0
//r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count
r.sum = r.sum + temp
if temp > r.max {
r.max = temp
}
if temp < r.min {
r.min = temp
}
rs.put(key, r)
start = end
if end == wPart.end {
break
}
}
container[workerNumber] = rs
}
func fParseFloat(str []byte) float64 {
switch len(str) {
case 3:
//x.x
a := float64(fCharToInt(str[0]))
b := float64(fCharToInt(str[2])) * 0.1
return a + b
case 4:
//either xx.x or -x.x
if str[0] == '-' {
//-x.x
a := float64(fCharToInt(str[1]))
b := float64(fCharToInt(str[3])) * 0.1
return -(a + b)
} else {
//xx.x
a := float64(f2CharToInt(str[0], str[1]))
b := float64(fCharToInt(str[3])) * 0.1
return a + b
}
case 5:
//-xx.x
a := float64(f2CharToInt(str[1], str[2]))
b := float64(fCharToInt(str[4])) * 0.1
return -(a + b)
default:
panic(fmt.Errorf("bad float string: %s", string(str)))
}
}
func fCharToInt(b byte) int {
return int(b - '0')
}
func f2CharToInt(b1 byte, b2 byte) int {
return int(((b1 - '0') * 10) + (b2 - '0'))
}
func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {
const step = 8
for i := last + step; i < part.end; {
switch b[i] {
case '\n':
return i + 1
case '\r':
return i + 2
case '.':
return i + 4
default:
i += 2
}
}
return part.end
}
func createPartitions(data []byte, seekChar byte, nPart int) []partitionRange {
if len(data) == 0 || nPart == 0 {
return make([]partitionRange, nPart)
}
if nPart == 0 {
return []partitionRange{}
}
si := int64(0)
step := int64(len(data) / nPart)
tLen := int64(len(data))
partitions := make([]partitionRange, nPart)
for i := 0; i < nPart; i++ {
start := si
end := si + step
if start > tLen {
return partitions[:len(partitions)-1]
}
if end > tLen {
partitions[i] = partitionRange{start: start, end: tLen}
return partitions
}
found := false
seekForward := end + maxSeekLen
//seek next new line
for j := end; j < min(seekForward, tLen); j++ {
if data[j] == seekChar {
if end == tLen {
//if the final character is also a split character we just return the rest of the chunk
end = tLen
} else {
end = j + 1
}
found = true
break
}
}
if !found {
//we'll start looking backward from the end point by the seek amount just in case we barely over-stepped
seekBack := max(0, end-maxSeekLen)
for j := end; j > seekBack; j-- {
if data[j] == seekChar {
end = j + 1
found = true
break
}
}
if !found {
fmt.Printf("Could not find split in range [%d, %d]\n%s", seekBack, seekForward, string(data)[seekBack:seekForward])
panic("thread died")
}
}
partitions[i] = partitionRange{start: start, end: end}
si = end
}
return partitions
}
func (r *result) merge(other *result) {
if other.max > r.max {
r.max = other.max
}
if other.min < r.min {
r.min = other.min
}
//r.rAvg = ((r.rAvg * r.count) + (other.rAvg * other.count)) / (r.count + other.count)
r.sum += other.sum
r.count += other.count
}
func (rs resultSet) merge(other resultSet) {
for k, v := range other {
tr, ok := rs[k]
if !ok {
rs[k] = v
continue
}
tr.merge(&v)
rs[k] = tr
}
}
func (rs resultSet) Report() string {
bldr := &strings.Builder{}
keyHist := make(map[int][]string)
for _, v := range rs {
name := string(v.name)
key := len(name)
container, ok := keyHist[key]
if !ok {
container = []string{}
}
if !slices.Contains(container, name) {
container = append(container, name)
keyHist[key] = container
}
}
keyList := []int{}
for k := range keyHist {
keyList = append(keyList, k)
}
slices.Sort(keyList)
for _, key := range keyList {
b, _ := json.Marshal(keyHist[key])
bldr.WriteString(fmt.Sprintf("[%d]{%s}\n", key, string(b)))
}
return bldr.String()
}
func (rs resultSet) String() string {
keys := make([]string, 0, len(rs))
for _, v := range rs {
keys = append(keys, string(v.name))
}
sort.Strings(keys)
b := &strings.Builder{}
hasher := fnv.New64()
for i, key := range keys {
hasher.Write([]byte(key))
r := rs[hasher.Sum64()]
hasher.Reset()
count += int64(r.count)
b.WriteString(key)
b.WriteString("=")
b.WriteString(fmt.Sprintf("%.1f", r.min))
b.WriteString("/")
b.WriteString(fmt.Sprintf("%.1f", r.sum/r.count))
b.WriteString("/")
b.WriteString(fmt.Sprintf("%.1f", r.max))
if i < len(keys)-1 {
b.WriteString(",\r\n")
}
}
return b.String()
}
func (f *fastResultSet) get(key uint64) (result, bool) {
p := f.v[f.keyMap(key)]
return p, p.name != nil
}
func (f *fastResultSet) put(key uint64, v result) {
idx := f.keyMap(key)
f.v[idx] = v
f.keys[idx] = key
f.size += 1
}
func (f *fastResultSet) keyMap(key uint64) uint64 {
return (key) % uint64(len(f.v))
}
func (f *fastResultSet) getKeys() []uint64 {
keys := make([]uint64, 0, f.size)
for _, k := range f.keys {
if k > 0 {
keys = append(keys, k)
}
}
return keys
}
func (f *fastResultSet) toResultSet() resultSet {
rs := make(resultSet)
for _, key := range f.getKeys() {
v, _ := f.get(key)
rs[key] = v
}
return rs
}