diff --git a/1brc.go b/1brc.go index 94b535f..c7ddf4f 100644 --- a/1brc.go +++ b/1brc.go @@ -2,8 +2,10 @@ package main import ( "bytes" + "errors" "fmt" "hash/fnv" + "io" "log" "math" "os" @@ -24,9 +26,9 @@ const testFile = "dummy.txt" const resultFile = "my_results.txt" const benchFile = "results.txt" -const profile = true +const profile = false -var nGoRoutine = 2 +var nGoRoutine = 64 const maxSeekLen = int64(100) @@ -53,9 +55,11 @@ func main() { if profile { f = startCpuProfile() } + _ = os.Remove(resultFile) start := time.Now() oneBRC() + //stream1BRC() finish := time.Now() if profile { stopProfiling(f) @@ -88,6 +92,60 @@ func validate() { } } +func stream1BRC() { + + f, err := os.OpenFile(measurementsFile, os.O_RDONLY, 0400) + if err != nil { + panic(err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + panic(err) + } + + idealChunkSize := (info.Size() / int64(nGoRoutine-1)) + + wg := &sync.WaitGroup{} + results := make([]resultSet, nGoRoutine) + + idx := int64(0) + + for i := 0; i < nGoRoutine; i++ { + wg.Add(1) + chunk := make([]byte, idealChunkSize) + l, err := f.ReadAt(chunk, idx) + if err != nil { + if !errors.Is(io.EOF, err) { + panic(err) + } else { + fmt.Println("EOF") + } + } + + var j int + for j = l - 1; j > 0; j-- { + if chunk[j] == '\n' { + break + } + } + + idx = int64(j + 1) + + go workerComputeChunk(chunk[:j+1], i, results, wg) + } + wg.Wait() + + finalRS := make(resultSet) + + for _, rs := range results { + finalRS.merge(rs) + } + + os.WriteFile(resultFile, []byte(finalRS.String()), 0666) +} + func oneBRC() { bits, err := os.ReadFile(measurementsFile) if err != nil { @@ -199,26 +257,73 @@ func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int container[workerNumber] = rs } -// createNewLineIndex -// we know that if we *just* returned a line, we can skip a few characters for known values -// each line has: -// - 5 x.x\r\n -// - 1 ; -// - 2 [a-z] first 2 characters of location -// -// for a total of 8 -func acreateNewLineIndex(b []byte, part partitionRange) []int64 { - idx := make([]int64, 0, 16*1024*1024) - const step = 8 +func workerComputeChunk(aData []byte, workerNumber int, container []resultSet, wg *sync.WaitGroup) { + defer wg.Done() + wPart := partitionRange{start: 0, end: int64(len(aData))} + const delimiter = byte(';') - for i := part.start; i < part.end; i++ { - if b[i] == '\n' { - idx = append(idx, i+1) - i += step + rs := make(resultSet) + + alloc := make([]result, 500) + aCnt := 0 + var di int + var line []byte + + hasher := fnv.New64() + + var start = wPart.start + var end int64 + for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) { + + line = aData[start : end-2] + //we know there will be at minimum 0.0, so we can skip 3 bytes + for di = len(line) - 3; di > -1; di-- { + if line[di] == delimiter { + break + } + } + + if di < 0 { + continue + } + + temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64) + if err != nil { + fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err) + return + } + + hasher.Write(line[:di]) + key := hasher.Sum64() + hasher.Reset() + r, ok := rs[key] + if !ok { + r = alloc[aCnt] + r.name = line[:di] + r.min = math.MaxFloat64 + r.max = math.SmallestNonzeroFloat64 + aCnt++ + } + r.count += 1.0 + + r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count + + if temp > r.max { + r.max = temp + } + + if temp < r.min { + r.min = temp + } + + rs[key] = r + start = end + if end == wPart.end { + break } } - return idx + container[workerNumber] = rs } func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {