Starting with 64 threads
Total ellapsed time: 6.61s Total computed records: 1000000000 Validation tripped: reference[Chi?in?u=-41.2/10.2/59.0,] me[Chișinău=-41.2/10.2/59.0,] Validation tripped: reference[Flores, Petén=-23.2/26.4/76.4,] me[Flores, Petén=-23.2/26.4/76.4,] Validation tripped: reference[Suwa?ki=-44.1/7.2/57.6,] me[Suwałki=-44.1/7.2/57.6,] Validation tripped: reference[Troms?=-46.9/2.9/54.1,] me[Tromsø=-46.9/2.9/54.1,] Validation tripped: reference[Wroc?aw=-42.9/9.6/57.8,] me[Wrocław=-42.9/9.6/57.8,] Validation tripped: reference[?zmir=-33.2/17.9/66.4] me[İzmir=-33.2/17.9/66.4]
This commit is contained in:
parent
737e8990c9
commit
3c13939717
141
1brc.go
141
1brc.go
@ -2,8 +2,10 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
@ -24,9 +26,9 @@ const testFile = "dummy.txt"
|
|||||||
const resultFile = "my_results.txt"
|
const resultFile = "my_results.txt"
|
||||||
const benchFile = "results.txt"
|
const benchFile = "results.txt"
|
||||||
|
|
||||||
const profile = true
|
const profile = false
|
||||||
|
|
||||||
var nGoRoutine = 2
|
var nGoRoutine = 64
|
||||||
|
|
||||||
const maxSeekLen = int64(100)
|
const maxSeekLen = int64(100)
|
||||||
|
|
||||||
@ -53,9 +55,11 @@ func main() {
|
|||||||
if profile {
|
if profile {
|
||||||
f = startCpuProfile()
|
f = startCpuProfile()
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = os.Remove(resultFile)
|
_ = os.Remove(resultFile)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
oneBRC()
|
oneBRC()
|
||||||
|
//stream1BRC()
|
||||||
finish := time.Now()
|
finish := time.Now()
|
||||||
if profile {
|
if profile {
|
||||||
stopProfiling(f)
|
stopProfiling(f)
|
||||||
@ -88,6 +92,60 @@ func validate() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func stream1BRC() {
|
||||||
|
|
||||||
|
f, err := os.OpenFile(measurementsFile, os.O_RDONLY, 0400)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
info, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
idealChunkSize := (info.Size() / int64(nGoRoutine-1))
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
results := make([]resultSet, nGoRoutine)
|
||||||
|
|
||||||
|
idx := int64(0)
|
||||||
|
|
||||||
|
for i := 0; i < nGoRoutine; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
chunk := make([]byte, idealChunkSize)
|
||||||
|
l, err := f.ReadAt(chunk, idx)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(io.EOF, err) {
|
||||||
|
panic(err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("EOF")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var j int
|
||||||
|
for j = l - 1; j > 0; j-- {
|
||||||
|
if chunk[j] == '\n' {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
idx = int64(j + 1)
|
||||||
|
|
||||||
|
go workerComputeChunk(chunk[:j+1], i, results, wg)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
finalRS := make(resultSet)
|
||||||
|
|
||||||
|
for _, rs := range results {
|
||||||
|
finalRS.merge(rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
os.WriteFile(resultFile, []byte(finalRS.String()), 0666)
|
||||||
|
}
|
||||||
|
|
||||||
func oneBRC() {
|
func oneBRC() {
|
||||||
bits, err := os.ReadFile(measurementsFile)
|
bits, err := os.ReadFile(measurementsFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -199,26 +257,73 @@ func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int
|
|||||||
container[workerNumber] = rs
|
container[workerNumber] = rs
|
||||||
}
|
}
|
||||||
|
|
||||||
// createNewLineIndex
|
func workerComputeChunk(aData []byte, workerNumber int, container []resultSet, wg *sync.WaitGroup) {
|
||||||
// we know that if we *just* returned a line, we can skip a few characters for known values
|
defer wg.Done()
|
||||||
// each line has:
|
wPart := partitionRange{start: 0, end: int64(len(aData))}
|
||||||
// - 5 x.x\r\n
|
const delimiter = byte(';')
|
||||||
// - 1 ;
|
|
||||||
// - 2 [a-z] first 2 characters of location
|
|
||||||
//
|
|
||||||
// for a total of 8
|
|
||||||
func acreateNewLineIndex(b []byte, part partitionRange) []int64 {
|
|
||||||
idx := make([]int64, 0, 16*1024*1024)
|
|
||||||
const step = 8
|
|
||||||
|
|
||||||
for i := part.start; i < part.end; i++ {
|
rs := make(resultSet)
|
||||||
if b[i] == '\n' {
|
|
||||||
idx = append(idx, i+1)
|
alloc := make([]result, 500)
|
||||||
i += step
|
aCnt := 0
|
||||||
|
var di int
|
||||||
|
var line []byte
|
||||||
|
|
||||||
|
hasher := fnv.New64()
|
||||||
|
|
||||||
|
var start = wPart.start
|
||||||
|
var end int64
|
||||||
|
for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) {
|
||||||
|
|
||||||
|
line = aData[start : end-2]
|
||||||
|
//we know there will be at minimum 0.0, so we can skip 3 bytes
|
||||||
|
for di = len(line) - 3; di > -1; di-- {
|
||||||
|
if line[di] == delimiter {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if di < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hasher.Write(line[:di])
|
||||||
|
key := hasher.Sum64()
|
||||||
|
hasher.Reset()
|
||||||
|
r, ok := rs[key]
|
||||||
|
if !ok {
|
||||||
|
r = alloc[aCnt]
|
||||||
|
r.name = line[:di]
|
||||||
|
r.min = math.MaxFloat64
|
||||||
|
r.max = math.SmallestNonzeroFloat64
|
||||||
|
aCnt++
|
||||||
|
}
|
||||||
|
r.count += 1.0
|
||||||
|
|
||||||
|
r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count
|
||||||
|
|
||||||
|
if temp > r.max {
|
||||||
|
r.max = temp
|
||||||
|
}
|
||||||
|
|
||||||
|
if temp < r.min {
|
||||||
|
r.min = temp
|
||||||
|
}
|
||||||
|
|
||||||
|
rs[key] = r
|
||||||
|
start = end
|
||||||
|
if end == wPart.end {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return idx
|
container[workerNumber] = rs
|
||||||
}
|
}
|
||||||
|
|
||||||
func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {
|
func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user