package main import ( "bytes" "fmt" "hash/fnv" "log" "math" "os" "runtime/pprof" "sort" "strings" "sync" "time" ) const measurementsFile = "measurements.txt" const testFile = "dummy.txt" const resultFile = "my_results.txt" const benchFile = "results.txt" const profile = false var nGoRoutine = 64 const maxSeekLen = int64(100) type partitionRange struct { start int64 end int64 } type resultSet map[uint64]result type fRS struct { v []result keys []uint64 size int } func newFRS() *fRS { const size = 50000 return &fRS{v: make([]result, size), keys: make([]uint64, size)} } type result struct { name []byte count float64 min float64 max float64 rAvg float64 } var count = int64(0) func main() { fmt.Printf("Starting with %d threads\n", nGoRoutine) var f *os.File if profile { f = startCpuProfile() } _ = os.Remove(testFile) start := time.Now() oneBRC() finish := time.Now() if profile { stopProfiling(f) } fmt.Printf("Total ellapsed time: %.2fs\n", finish.Sub(start).Seconds()) fmt.Printf("Total computed records: %d\n", count) validate() } func oneBRC() { bits, err := os.ReadFile(measurementsFile) if err != nil { panic(err) } parts := createPartitions(bits, '\n', nGoRoutine) wg := &sync.WaitGroup{} results := make([]*fRS, len(parts)) for i, part := range parts { wg.Add(1) go workerComputePartition(bits, part, i, results, wg) } wg.Wait() finalRS := make(resultSet) for _, rs := range results { finalRS.merge(rs.toResultSet()) } os.WriteFile(resultFile, []byte(finalRS.String()), 0666) } func validate() { me, err := os.ReadFile(resultFile) if err != nil { panic(err) } ref, err := os.ReadFile(benchFile) if err != nil { panic(err) } meLines := bytes.Split(me, []byte{'\r', '\n'}) refLines := bytes.Split(ref, []byte{'\r', '\n'}) errs := 0 for i, line := range meLines { refLine := string(refLines[i]) meLine := string(line) if refLine != meLine { errs++ fmt.Printf("Validation tripped: reference[%s]\tme[%s]\n", refLine, meLine) } } fmt.Printf("Validation passed: %v\n", errs == 0) } func startCpuProfile() *os.File { name := fmt.Sprintf("prof-%s.pprof", time.Now().Format("15-04-05")) f, err := os.Create(name) if err != nil { log.Fatal(err) } err = pprof.StartCPUProfile(f) if err != nil { log.Fatal(err) } return f } func stopProfiling(f *os.File) { pprof.StopCPUProfile() f.Close() } func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int, container []*fRS, wg *sync.WaitGroup) { defer wg.Done() const delimiter = byte(';') rs := newFRS() alloc := make([]result, 500) aCnt := 0 var di int var line []byte hasher := fnv.New64() var start = wPart.start var end int64 for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) { line = aData[start : end-2] //we know there will be at minimum 0.0, so we can skip 3 bytes for di = len(line) - 3; di > -1; di-- { if line[di] == delimiter { break } } if di < 0 { continue } temp := fParseFloat(line[di+1:][:len(line[di+1:])]) hasher.Write(line[:di]) key := hasher.Sum64() hasher.Reset() r, ok := rs.get(key) if !ok { r = alloc[aCnt] r.name = line[:di] r.min = math.MaxFloat64 r.max = math.SmallestNonzeroFloat64 aCnt++ } r.count += 1.0 r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count if temp > r.max { r.max = temp } if temp < r.min { r.min = temp } rs.put(key, r) start = end if end == wPart.end { break } } container[workerNumber] = rs } func fParseFloat(str []byte) float64 { switch len(str) { case 3: //x.x a := float64(fCharToInt(str[0])) b := float64(fCharToInt(str[2])) * 0.1 return a + b case 4: //either xx.x or -x.x if str[0] == '-' { //-x.x a := float64(fCharToInt(str[1])) b := float64(fCharToInt(str[3])) * 0.1 return -(a + b) } else { //xx.x a := float64(f2CharToInt(str[0], str[1])) b := float64(fCharToInt(str[3])) * 0.1 return a + b } case 5: //-xx.x a := float64(f2CharToInt(str[1], str[2])) b := float64(fCharToInt(str[4])) * 0.1 return -(a + b) default: panic(fmt.Errorf("bad float string: %s", string(str))) } } func fCharToInt(b byte) int { return int(b - '0') } func f2CharToInt(b1 byte, b2 byte) int { return int(((b1 - '0') * 10) + (b2 - '0')) } func seekNextNewLine(b []byte, part partitionRange, last int64) int64 { const step = 8 for i := last + step; i < part.end; { switch b[i] { case '\n': return i + 1 case '\r': return i + 2 case ';': // this will be minimum [;0.0\r] i += 5 default: i += 2 } } return part.end } func createPartitions(data []byte, seekChar byte, nPart int) []partitionRange { if len(data) == 0 || nPart == 0 { return make([]partitionRange, nPart) } if nPart == 0 { return []partitionRange{} } si := int64(0) step := int64(len(data) / nPart) tLen := int64(len(data)) partitions := make([]partitionRange, nPart) for i := 0; i < nPart; i++ { start := si end := si + step if start > tLen { return partitions[:len(partitions)-1] } if end > tLen { partitions[i] = partitionRange{start: start, end: tLen} return partitions } found := false seekForward := end + maxSeekLen //seek next new line for j := end; j < min(seekForward, tLen); j++ { if data[j] == seekChar { if end == tLen { //if the final character is also a split character we just return the rest of the chunk end = tLen } else { end = j + 1 } found = true break } } if !found { //we'll start looking backward from the end point by the seek amount just in case we barely over-stepped seekBack := max(0, end-maxSeekLen) for j := end; j > seekBack; j-- { if data[j] == seekChar { end = j + 1 found = true break } } if !found { fmt.Printf("Could not find split in range [%d, %d]\n%s", seekBack, seekForward, string(data)[seekBack:seekForward]) panic("thread died") } } partitions[i] = partitionRange{start: start, end: end} si = end } return partitions } func (r *result) merge(other *result) { if other.max > r.max { r.max = other.max } if other.min < r.min { r.min = other.min } r.rAvg = ((r.rAvg * r.count) + (other.rAvg * other.count)) / (r.count + other.count) r.count += other.count } func (rs resultSet) merge(other resultSet) { for k, v := range other { tr, ok := rs[k] if !ok { rs[k] = v continue } tr.merge(&v) rs[k] = tr } } func (rs resultSet) String() string { keys := make([]string, 0, len(rs)) for _, v := range rs { keys = append(keys, string(v.name)) } sort.Strings(keys) b := &strings.Builder{} hasher := fnv.New64() for i, key := range keys { hasher.Write([]byte(key)) r := rs[hasher.Sum64()] hasher.Reset() count += int64(r.count) b.WriteString(key) b.WriteString("=") b.WriteString(fmt.Sprintf("%.1f", r.min)) b.WriteString("/") b.WriteString(fmt.Sprintf("%.1f", r.rAvg)) b.WriteString("/") b.WriteString(fmt.Sprintf("%.1f", r.max)) if i < len(keys)-1 { b.WriteString(",\r\n") } } return b.String() } func (f *fRS) get(key uint64) (result, bool) { p := f.v[f.keyMap(key)] return p, p.name != nil } func (f *fRS) put(key uint64, v result) { idx := f.keyMap(key) f.v[idx] = v f.keys[idx] = key f.size += 1 } func (f *fRS) keyMap(key uint64) uint64 { //a := key & math.MaxUint32 //b := (key >> 32) & math.MaxUint32 return (key) % uint64(len(f.v)) } func (f *fRS) getKeys() []uint64 { keys := make([]uint64, 0, f.size) for _, k := range f.keys { if k > 0 { keys = append(keys, k) } } return keys } func (f *fRS) toResultSet() resultSet { rs := make(resultSet) for _, key := range f.getKeys() { v, _ := f.get(key) rs[key] = v } return rs }