From 3c139397179d8743eb42528ae5de3a2fcdc34960 Mon Sep 17 00:00:00 2001 From: dtookey Date: Wed, 31 Jul 2024 19:45:30 -0400 Subject: [PATCH] =?UTF-8?q?Starting=20with=2064=20threads=20Total=20ellaps?= =?UTF-8?q?ed=20time:=206.61s=20Total=20computed=20records:=201000000000?= =?UTF-8?q?=20Validation=20tripped:=20reference[Chi=3Fin=3Fu=3D-41.2/10.2/?= =?UTF-8?q?59.0,]=09me[Chi=C8=99in=C4=83u=3D-41.2/10.2/59.0,]=20Validation?= =?UTF-8?q?=20tripped:=20reference[Flores,=20Pet=C3=A9n=3D-23.2/26.4/76.4,?= =?UTF-8?q?]=09me[Flores,=20=20Pet=C3=A9n=3D-23.2/26.4/76.4,]=20Validation?= =?UTF-8?q?=20tripped:=20reference[Suwa=3Fki=3D-44.1/7.2/57.6,]=09me[Suwa?= =?UTF-8?q?=C5=82ki=3D-44.1/7.2/57.6,]=20Validation=20tripped:=20reference?= =?UTF-8?q?[Troms=3F=3D-46.9/2.9/54.1,]=09me[Troms=C3=B8=3D-46.9/2.9/54.1,?= =?UTF-8?q?]=20Validation=20tripped:=20reference[Wroc=3Faw=3D-42.9/9.6/57.?= =?UTF-8?q?8,]=09me[Wroc=C5=82aw=3D-42.9/9.6/57.8,]=20Validation=20tripped?= =?UTF-8?q?:=20reference[=3Fzmir=3D-33.2/17.9/66.4]=09me[=C4=B0zmir=3D-33.?= =?UTF-8?q?2/17.9/66.4]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1brc.go | 141 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 123 insertions(+), 18 deletions(-) diff --git a/1brc.go b/1brc.go index 94b535f..c7ddf4f 100644 --- a/1brc.go +++ b/1brc.go @@ -2,8 +2,10 @@ package main import ( "bytes" + "errors" "fmt" "hash/fnv" + "io" "log" "math" "os" @@ -24,9 +26,9 @@ const testFile = "dummy.txt" const resultFile = "my_results.txt" const benchFile = "results.txt" -const profile = true +const profile = false -var nGoRoutine = 2 +var nGoRoutine = 64 const maxSeekLen = int64(100) @@ -53,9 +55,11 @@ func main() { if profile { f = startCpuProfile() } + _ = os.Remove(resultFile) start := time.Now() oneBRC() + //stream1BRC() finish := time.Now() if profile { stopProfiling(f) @@ -88,6 +92,60 @@ func validate() { } } +func stream1BRC() { + + f, err := os.OpenFile(measurementsFile, os.O_RDONLY, 0400) + if err != nil { + panic(err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + panic(err) + } + + idealChunkSize := (info.Size() / int64(nGoRoutine-1)) + + wg := &sync.WaitGroup{} + results := make([]resultSet, nGoRoutine) + + idx := int64(0) + + for i := 0; i < nGoRoutine; i++ { + wg.Add(1) + chunk := make([]byte, idealChunkSize) + l, err := f.ReadAt(chunk, idx) + if err != nil { + if !errors.Is(io.EOF, err) { + panic(err) + } else { + fmt.Println("EOF") + } + } + + var j int + for j = l - 1; j > 0; j-- { + if chunk[j] == '\n' { + break + } + } + + idx = int64(j + 1) + + go workerComputeChunk(chunk[:j+1], i, results, wg) + } + wg.Wait() + + finalRS := make(resultSet) + + for _, rs := range results { + finalRS.merge(rs) + } + + os.WriteFile(resultFile, []byte(finalRS.String()), 0666) +} + func oneBRC() { bits, err := os.ReadFile(measurementsFile) if err != nil { @@ -199,26 +257,73 @@ func workerComputePartition(aData []byte, wPart partitionRange, workerNumber int container[workerNumber] = rs } -// createNewLineIndex -// we know that if we *just* returned a line, we can skip a few characters for known values -// each line has: -// - 5 x.x\r\n -// - 1 ; -// - 2 [a-z] first 2 characters of location -// -// for a total of 8 -func acreateNewLineIndex(b []byte, part partitionRange) []int64 { - idx := make([]int64, 0, 16*1024*1024) - const step = 8 +func workerComputeChunk(aData []byte, workerNumber int, container []resultSet, wg *sync.WaitGroup) { + defer wg.Done() + wPart := partitionRange{start: 0, end: int64(len(aData))} + const delimiter = byte(';') - for i := part.start; i < part.end; i++ { - if b[i] == '\n' { - idx = append(idx, i+1) - i += step + rs := make(resultSet) + + alloc := make([]result, 500) + aCnt := 0 + var di int + var line []byte + + hasher := fnv.New64() + + var start = wPart.start + var end int64 + for end = seekNextNewLine(aData, wPart, start); end <= wPart.end; end = seekNextNewLine(aData, wPart, start) { + + line = aData[start : end-2] + //we know there will be at minimum 0.0, so we can skip 3 bytes + for di = len(line) - 3; di > -1; di-- { + if line[di] == delimiter { + break + } + } + + if di < 0 { + continue + } + + temp, err := strconv.ParseFloat(string(line[di+1:][:len(line[di+1:])]), 64) + if err != nil { + fmt.Printf("failed to parse float[%s]: %+v", string(line[di+1:]), err) + return + } + + hasher.Write(line[:di]) + key := hasher.Sum64() + hasher.Reset() + r, ok := rs[key] + if !ok { + r = alloc[aCnt] + r.name = line[:di] + r.min = math.MaxFloat64 + r.max = math.SmallestNonzeroFloat64 + aCnt++ + } + r.count += 1.0 + + r.rAvg = ((r.rAvg * (r.count - 1.0)) + temp) / r.count + + if temp > r.max { + r.max = temp + } + + if temp < r.min { + r.min = temp + } + + rs[key] = r + start = end + if end == wPart.end { + break } } - return idx + container[workerNumber] = rs } func seekNextNewLine(b []byte, part partitionRange, last int64) int64 {