From 803dae7a11f7e848d822c0d231f86cdfb519db1f Mon Sep 17 00:00:00 2001 From: dtookey Date: Thu, 1 Aug 2024 06:42:54 -0400 Subject: [PATCH] float parsing --- 1brc.go | 207 ++++++++++++++------------------------------------------ 1 file changed, 52 insertions(+), 155 deletions(-) diff --git a/1brc.go b/1brc.go index c7ddf4f..e3bdc46 100644 --- a/1brc.go +++ b/1brc.go @@ -2,25 +2,18 @@ package main import ( "bytes" - "errors" "fmt" "hash/fnv" - "io" "log" "math" "os" "runtime/pprof" "sort" - "strconv" "strings" "sync" "time" ) -//todo: -// bytes.split is doing a ton of allocation. I think we can do better by only returning references instead of doing -// a copy - const measurementsFile = "measurements.txt" const testFile = "dummy.txt" const resultFile = "my_results.txt" @@ -56,10 +49,9 @@ func main() { f = startCpuProfile() } - _ = os.Remove(resultFile) + _ = os.Remove(testFile) start := time.Now() oneBRC() - //stream1BRC() finish := time.Now() if profile { stopProfiling(f) @@ -69,83 +61,6 @@ func main() { validate() } -func validate() { - me, err := os.ReadFile(resultFile) - if err != nil { - panic(err) - } - - ref, err := os.ReadFile(benchFile) - if err != nil { - panic(err) - } - - meLines := bytes.Split(me, []byte{'\r', '\n'}) - refLines := bytes.Split(ref, []byte{'\r', '\n'}) - - for i, line := range meLines { - refLine := string(refLines[i]) - meLine := string(line) - if refLine != meLine { - fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine) - } - } -} - -func stream1BRC() { - - f, err := os.OpenFile(measurementsFile, os.O_RDONLY, 0400) - if err != nil { - panic(err) - } - defer f.Close() - - info, err := f.Stat() - if err != nil { - panic(err) - } - - idealChunkSize := (info.Size() / int64(nGoRoutine-1)) - - wg := &sync.WaitGroup{} - results := make([]resultSet, nGoRoutine) - - idx := int64(0) - - for i := 0; i < nGoRoutine; i++ { - wg.Add(1) - chunk := make([]byte, idealChunkSize) - l, err := f.ReadAt(chunk, idx) - if err != nil { - if !errors.Is(io.EOF, err) { - panic(err) - } else { - fmt.Println("EOF") - } - } - - var j int - for j = l - 1; j > 0; j-- { - if chunk[j] == '\n' { - break - } - } - - idx = int64(j + 1) - - go workerComputeChunk(chunk[:j+1], i, results, wg) - } - wg.Wait() - - finalRS := make(resultSet) - - for _, rs := range results { - finalRS.merge(rs) - } - - os.WriteFile(resultFile, []byte(finalRS.String()), 0666) -} - func oneBRC() { bits, err := os.ReadFile(measurementsFile) if err != nil { @@ -171,6 +86,29 @@ func oneBRC() { os.WriteFile(resultFile, []byte(finalRS.String()), 0666) } +func validate() { + me, err := os.ReadFile(resultFile) + if err != nil { + panic(err) + } + + ref, err := os.ReadFile(benchFile) + if err != nil { + panic(err) + } + + meLines := bytes.Split(me, []byte{'\r', '\n'}) + refLines := bytes.Split(ref, []byte{'\r', '\n'}) + + for i, line := range meLines { + refLine := string(refLines[i]) + meLine := string(line) + if refLine != meLine { + fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine) + } + } +} + func startCpuProfile() *os.File { name := fmt.Sprintf("prof-%s.pprof", time.Now().Format("15-04-05")) f, err := os.Create(name) @@ -218,11 +156,7 @@ func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int continue } - temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64) - if err != nil { - fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err) - return - } + temp := fParseFloat(line[di+1:][:len(line[di+1:])]) hasher.Write(line[:di]) key := hasher.Sum64() @@ -257,73 +191,36 @@ func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int container[workerNumber] = rs } -func workerComputeChunk(aData []byte, workerNumber int, container []resultSet, wg *sync.WaitGroup) { - defer wg.Done() - wPart := partitionRange{start: 0, end: int64(len(aData))} - const delimiter = byte(';') +func fParseFloat(str []byte) float64 { + //kick everything off by computing the remainder + v := float64(fCharToInt(str[len(str)-1])) / 10.0 + positive := true + end := 0 - rs := make(resultSet) - - alloc := make([]result, 500) - aCnt := 0 - var di int - var line []byte - - hasher := fnv.New64() - - var start = wPart.start - var end int64 - for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) { - - line = aData[start : end-2] - //we know there will be at minimum 0.0, so we can skip 3 bytes - for di = len(line) - 3; di > -1; di-- { - if line[di] == delimiter { - break - } - } - - if di < 0 { - continue - } - - temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64) - if err != nil { - fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err) - return - } - - hasher.Write(line[:di]) - key := hasher.Sum64() - hasher.Reset() - r, ok := rs[key] - if !ok { - r = alloc[aCnt] - r.name = line[:di] - r.min = math.MaxFloat64 - r.max = math.SmallestNonzeroFloat64 - aCnt++ - } - r.count += 1.0 - - r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count - - if temp > r.max { - r.max = temp - } - - if temp < r.min { - r.min = temp - } - - rs[key] = r - start = end - if end == wPart.end { - break - } + if str[0] == '-' { + positive = false + end = 1 } - container[workerNumber] = rs + start := len(str) - 3 + oom := 0.0 + for i := start; i >= end; i-- { + v += float64(fCharToInt(str[i])) * math.Pow(10.0, oom) + oom += 1.0 + } + + if positive { + return v + } else { + return -v + } +} + +func fCharToInt(b byte) int { + if b >= '0' && b <= '9' { + return int(b - '0') + } + panic(fmt.Errorf("couldn't parse: %v", string(b))) } func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {