float parsing

This commit is contained in:
dtookey 2024-08-01 06:42:54 -04:00
parent 3c13939717
commit 803dae7a11

197
1brc.go
View File

@ -2,25 +2,18 @@ package main
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"io"
"log" "log"
"math" "math"
"os" "os"
"runtime/pprof" "runtime/pprof"
"sort" "sort"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
) )
//todo:
// bytes.split is doing a ton of allocation. I think we can do better by only returning references instead of doing
// a copy
const measurementsFile = "measurements.txt" const measurementsFile = "measurements.txt"
const testFile = "dummy.txt" const testFile = "dummy.txt"
const resultFile = "my_results.txt" const resultFile = "my_results.txt"
@ -56,10 +49,9 @@ func main() {
f = startCpuProfile() f = startCpuProfile()
} }
_ = os.Remove(resultFile) _ = os.Remove(testFile)
start := time.Now() start := time.Now()
oneBRC() oneBRC()
//stream1BRC()
finish := time.Now() finish := time.Now()
if profile { if profile {
stopProfiling(f) stopProfiling(f)
@ -69,83 +61,6 @@ func main() {
validate() validate()
} }
func validate() {
me, err := os.ReadFile(resultFile)
if err != nil {
panic(err)
}
ref, err := os.ReadFile(benchFile)
if err != nil {
panic(err)
}
meLines := bytes.Split(me, []byte{'\r', '\n'})
refLines := bytes.Split(ref, []byte{'\r', '\n'})
for i, line := range meLines {
refLine := string(refLines[i])
meLine := string(line)
if refLine != meLine {
fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine)
}
}
}
func stream1BRC() {
f, err := os.OpenFile(measurementsFile, os.O_RDONLY, 0400)
if err != nil {
panic(err)
}
defer f.Close()
info, err := f.Stat()
if err != nil {
panic(err)
}
idealChunkSize := (info.Size() / int64(nGoRoutine-1))
wg := &sync.WaitGroup{}
results := make([]resultSet, nGoRoutine)
idx := int64(0)
for i := 0; i < nGoRoutine; i++ {
wg.Add(1)
chunk := make([]byte, idealChunkSize)
l, err := f.ReadAt(chunk, idx)
if err != nil {
if !errors.Is(io.EOF, err) {
panic(err)
} else {
fmt.Println("EOF")
}
}
var j int
for j = l - 1; j > 0; j-- {
if chunk[j] == '\n' {
break
}
}
idx = int64(j + 1)
go workerComputeChunk(chunk[:j+1], i, results, wg)
}
wg.Wait()
finalRS := make(resultSet)
for _, rs := range results {
finalRS.merge(rs)
}
os.WriteFile(resultFile, []byte(finalRS.String()), 0666)
}
func oneBRC() { func oneBRC() {
bits, err := os.ReadFile(measurementsFile) bits, err := os.ReadFile(measurementsFile)
if err != nil { if err != nil {
@ -171,6 +86,29 @@ func oneBRC() {
os.WriteFile(resultFile, []byte(finalRS.String()), 0666) os.WriteFile(resultFile, []byte(finalRS.String()), 0666)
} }
func validate() {
me, err := os.ReadFile(resultFile)
if err != nil {
panic(err)
}
ref, err := os.ReadFile(benchFile)
if err != nil {
panic(err)
}
meLines := bytes.Split(me, []byte{'\r', '\n'})
refLines := bytes.Split(ref, []byte{'\r', '\n'})
for i, line := range meLines {
refLine := string(refLines[i])
meLine := string(line)
if refLine != meLine {
fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine)
}
}
}
func startCpuProfile() *os.File { func startCpuProfile() *os.File {
name := fmt.Sprintf("prof-%s.pprof", time.Now().Format("15-04-05")) name := fmt.Sprintf("prof-%s.pprof", time.Now().Format("15-04-05"))
f, err := os.Create(name) f, err := os.Create(name)
@ -218,11 +156,7 @@ func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int
continue continue
} }
temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64) temp := fParseFloat(line[di+1:][:len(line[di+1:])])
if err != nil {
fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err)
return
}
hasher.Write(line[:di]) hasher.Write(line[:di])
key := hasher.Sum64() key := hasher.Sum64()
@ -257,73 +191,36 @@ func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int
container[workerNumber] = rs container[workerNumber] = rs
} }
func workerComputeChunk(aData []byte, workerNumber int, container []resultSet, wg *sync.WaitGroup) { func fParseFloat(str []byte) float64 {
defer wg.Done() //kick everything off by computing the remainder
wPart := partitionRange{start: 0, end: int64(len(aData))} v := float64(fCharToInt(str[len(str)-1])) / 10.0
const delimiter = byte(';') positive := true
end := 0
rs := make(resultSet) if str[0] == '-' {
positive = false
alloc := make([]result, 500) end = 1
aCnt := 0
var di int
var line []byte
hasher := fnv.New64()
var start = wPart.start
var end int64
for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) {
line = aData[start : end-2]
//we know there will be at minimum 0.0, so we can skip 3 bytes
for di = len(line) - 3; di > -1; di-- {
if line[di] == delimiter {
break
}
} }
if di < 0 { start := len(str) - 3
continue oom := 0.0
for i := start; i >= end; i-- {
v += float64(fCharToInt(str[i])) * math.Pow(10.0, oom)
oom += 1.0
} }
temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64) if positive {
if err != nil { return v
fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err) } else {
return return -v
} }
}
hasher.Write(line[:di]) func fCharToInt(b byte) int {
key := hasher.Sum64() if b >= '0' && b <= '9' {
hasher.Reset() return int(b - '0')
r, ok := rs[key]
if !ok {
r = alloc[aCnt]
r.name = line[:di]
r.min = math.MaxFloat64
r.max = math.SmallestNonzeroFloat64
aCnt++
} }
r.count += 1.0 panic(fmt.Errorf("couldn't parse: %v", string(b)))
r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count
if temp > r.max {
r.max = temp
}
if temp < r.min {
r.min = temp
}
rs[key] = r
start = end
if end == wPart.end {
break
}
}
container[workerNumber] = rs
} }
func seekNextNewLine(b []byte, part partitionRange, last int64) int64 { func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {