From 737e8990c9ced665f4dec4868a6313ff292cb5e1 Mon Sep 17 00:00:00 2001 From: dtookey Date: Wed, 31 Jul 2024 18:44:22 -0400 Subject: [PATCH] checkpoint prior to streaming --- .gitignore | 2 + .idea/.gitignore | 8 + .idea/misc.xml | 6 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + 1brc-go.iml | 9 ++ 1brc.go | 364 ++++++++++++++++++++++++++++++++++++++++++++++ 1brc_test.go | 66 +++++++++ go.mod | 3 + 9 files changed, 472 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 1brc-go.iml create mode 100644 1brc.go create mode 100644 1brc_test.go create mode 100644 go.mod diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..81eeea7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/measurements.txt +/results.txt diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..639900d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..eb49754 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/1brc-go.iml b/1brc-go.iml new file mode 100644 index 0000000..eacc75a --- /dev/null +++ b/1brc-go.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/1brc.go b/1brc.go new file mode 100644 index 0000000..94b535f --- /dev/null +++ b/1brc.go @@ -0,0 +1,364 @@ +package main + +import ( + "bytes" + "fmt" + "hash/fnv" + "log" + "math" + "os" + "runtime/pprof" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +//todo: +// bytes.split is doing a ton of allocation. I think we can do better by only returning references instead of doing +// a copy + +const measurementsFile = "measurements.txt" +const testFile = "dummy.txt" +const resultFile = "my_results.txt" +const benchFile = "results.txt" + +const profile = true + +var nGoRoutine = 2 + +const maxSeekLen = int64(100) + +type partitionRange struct { + start int64 + end int64 +} + +type resultSet map[uint64]result + +type result struct { + name []byte + count float64 + min float64 + max float64 + rAvg float64 +} + +var count = int64(0) + +func main() { + fmt.Printf("Starting with %d threads\n", nGoRoutine) + var f *os.File + if profile { + f = startCpuProfile() + } + _ = os.Remove(resultFile) + start := time.Now() + oneBRC() + finish := time.Now() + if profile { + stopProfiling(f) + } + fmt.Printf("Total ellapsed time: %.2fs\n", finish.Sub(start).Seconds()) + fmt.Printf("Total computed records: %d\n", count) + validate() +} + +func validate() { + me, err := os.ReadFile(resultFile) + if err != nil { + panic(err) + } + + ref, err := os.ReadFile(benchFile) + if err != nil { + panic(err) + } + + meLines := bytes.Split(me, []byte{'\r', '\n'}) + refLines := bytes.Split(ref, []byte{'\r', '\n'}) + + for i, line := range meLines { + refLine := string(refLines[i]) + meLine := string(line) + if refLine != meLine { + fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine) + } + } +} + +func oneBRC() { + bits, err := os.ReadFile(measurementsFile) + if err != nil { + panic(err) + } + + parts := createPartitions(bits, '\n', nGoRoutine) + wg := &sync.WaitGroup{} + results := make([]resultSet, len(parts)) + + for i, part := range parts { + wg.Add(1) + go workerComputePartition(bits, part, i, results, wg) + } + wg.Wait() + + finalRS := make(resultSet) + + for _, rs := range results { + finalRS.merge(rs) + } + + os.WriteFile(resultFile, []byte(finalRS.String()), 0666) +} + +func startCpuProfile() *os.File { + name := fmt.Sprintf("prof-%s.pprof", time.Now().Format("15-04-05")) + f, err := os.Create(name) + if err != nil { + log.Fatal(err) + } + err = pprof.StartCPUProfile(f) + if err != nil { + log.Fatal(err) + } + return f +} + +func stopProfiling(f *os.File) { + pprof.StopCPUProfile() + f.Close() +} + +func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int, container []resultSet, wg *sync.WaitGroup) { + defer wg.Done() + const delimiter = byte(';') + + rs := make(resultSet) + + alloc := make([]result, 500) + aCnt := 0 + var di int + var line []byte + + hasher := fnv.New64() + + var start = wPart.start + var end int64 + for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) { + + line = aData[start : end-2] + //we know there will be at minimum 0.0, so we can skip 3 bytes + for di = len(line) - 3; di > -1; di-- { + if line[di] == delimiter { + break + } + } + + if di < 0 { + continue + } + + temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64) + if err != nil { + fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err) + return + } + + hasher.Write(line[:di]) + key := hasher.Sum64() + hasher.Reset() + r, ok := rs[key] + if !ok { + r = alloc[aCnt] + r.name = line[:di] + r.min = math.MaxFloat64 + r.max = math.SmallestNonzeroFloat64 + aCnt++ + } + r.count += 1.0 + + r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count + + if temp > r.max { + r.max = temp + } + + if temp < r.min { + r.min = temp + } + + rs[key] = r + start = end + if end == wPart.end { + break + } + } + + container[workerNumber] = rs +} + +// createNewLineIndex +// we know that if we *just* returned a line, we can skip a few characters for known values +// each line has: +// - 5 x.x\r\n +// - 1 ; +// - 2 [a-z] first 2 characters of location +// +// for a total of 8 +func acreateNewLineIndex(b []byte, part partitionRange) []int64 { + idx := make([]int64, 0, 16*1024*1024) + const step = 8 + + for i := part.start; i < part.end; i++ { + if b[i] == '\n' { + idx = append(idx, i+1) + i += step + } + } + + return idx +} + +func seekNextNewLine(b []byte, part partitionRange, last int64) int64 { + const step = 8 + for i := last + step; i < part.end; { + switch b[i] { + case '\n': + return i + 1 + case '\r': + return i + 2 + case ';': // this will be minimum [;0.0\r] + i += 5 + default: + i += 2 + } + } + return part.end +} + +func createPartitions(data []byte, seekChar byte, nPart int) []partitionRange { + if len(data) == 0 || nPart == 0 { + return make([]partitionRange, nPart) + } + + if nPart == 0 { + return []partitionRange{} + } + + si := int64(0) + step := int64(len(data) / nPart) + tLen := int64(len(data)) + + partitions := make([]partitionRange, nPart) + + for i := 0; i < nPart; i++ { + start := si + + end := si + step + + if start > tLen { + return partitions[:len(partitions)-1] + } + + if end > tLen { + partitions[i] = partitionRange{start: start, end: tLen} + return partitions + } + + found := false + seekForward := end + maxSeekLen + //seek next new line + for j := end; j < min(seekForward, tLen); j++ { + if data[j] == seekChar { + if end == tLen { + //if the final character is also a split character we just return the rest of the chunk + end = tLen + } else { + end = j + 1 + } + found = true + break + } + } + + if !found { + //we'll start looking backward from the end point by the seek amount just in case we barely over-stepped + seekBack := max(0, end-maxSeekLen) + for j := end; j > seekBack; j-- { + if data[j] == seekChar { + end = j + 1 + found = true + break + } + } + if !found { + fmt.Printf("Could not find split in range [%d, %d]\n%s", seekBack, seekForward, string(data)[seekBack:seekForward]) + panic("thread died") + } + } + + partitions[i] = partitionRange{start: start, end: end} + si = end + + } + + return partitions +} + +func (r *result) merge(other *result) { + if other.max > r.max { + r.max = other.max + } + + if other.min < r.min { + r.min = other.min + } + + r.rAvg = ((r.rAvg * r.count) + (other.rAvg * other.count)) / (r.count + other.count) + r.count += other.count +} + +func (rs resultSet) merge(other resultSet) { + for k, v := range other { + tr, ok := rs[k] + if !ok { + rs[k] = v + continue + } + + tr.merge(&v) + + rs[k] = tr + } +} + +func (rs resultSet) String() string { + keys := make([]string, 0, len(rs)) + + for _, v := range rs { + keys = append(keys, string(v.name)) + } + + sort.Strings(keys) + b := &strings.Builder{} + hasher := fnv.New64() + for i, key := range keys { + hasher.Write([]byte(key)) + r := rs[hasher.Sum64()] + hasher.Reset() + count += int64(r.count) + b.WriteString(key) + b.WriteString("=") + b.WriteString(fmt.Sprintf("%.1f", r.min)) + b.WriteString("/") + b.WriteString(fmt.Sprintf("%.1f", r.rAvg)) + b.WriteString("/") + b.WriteString(fmt.Sprintf("%.1f", r.max)) + if i < len(keys)-1 { + b.WriteString(",\r\n") + } + } + return b.String() +} diff --git a/1brc_test.go b/1brc_test.go new file mode 100644 index 0000000..e87d5e1 --- /dev/null +++ b/1brc_test.go @@ -0,0 +1,66 @@ +package main + +import ( + "testing" +) + +func TestCreatePartitions(t *testing.T) { + tests := []struct { + name string + data []byte + seekChar byte + nPart int + expected []partitionRange + }{ + { + "NormalCase", + []byte("xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1xxxx1"), + '1', + 3, + []partitionRange{{0, 15}, {15, 30}, {30, 45}}, + }, + { + "NoCharFound", + []byte("xxx1xxx1xxx1xxx1xxx1xxx1xxx1xxx1xxx1"), + 'z', + 3, + []partitionRange{{0, 12}, {12, 24}, {24, 36}}, + }, + { + "SingleCharData", + []byte{'g'}, + 'g', + 1, + []partitionRange{{0, 1}}, + }, + { + "EmptyData", + []byte{}, + 'g', + 3, + []partitionRange{{0, 0}, {0, 0}, {0, 0}}, + }, + { + "NoPartitions", + []byte("abcdefghijk"), + 'g', + 0, + []partitionRange{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := createPartitions(tt.data, tt.seekChar, tt.nPart) + if len(result) != len(tt.expected) { + t.Errorf("Got %v, expected %v", result, tt.expected) + } + + for i, val := range result { + if val != tt.expected[i] { + t.Errorf("At %v: got %v, expected %v", i, val, tt.expected[i]) + } + } + }) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..46f56c5 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module 1brc-go + +go 1.21