I'm writing a POC to process a very large text file ~1 billion+ lines and am experimenting with Go for this;
package main
import (
"bufio"
"fmt"
"log"
"os"
"time"
)
func main() {
start := time.Now()
file, err := os.Open("dump10.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
go fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
secs := time.Since(start).Seconds()
fmt.Printf("Took %.2fs", secs)
}
However when running this I get this error;
panic: too many concurrent operations on a single file or socket (max 1048575)
I haven't found anything online that deals with this specific error. I'm not sure if it's a file descriptors issue, the maximum listed in the error is much higher than my ulimit -n
limit of 500,000.
What is the best way to do this?
As it's not obvious, fmt.Println
is a stand-in for the actual function I will call when processing the data.
Before considering to parallelize a process, you should study your input and computations to make sure that it makes sense.
An input that requires to be processed in order is not a good match because parallel processing would require additional complex instructions to keep things in order, it is difficult to evaluate upfront if this strategy will be a win.
Also in order to take advantage of parallelization, the computations to run must take longer than the time required to synchronize the parallel tasks. It is possible to outweigh this cost by bulking the data, but the resulting algorithm will be more complex and creates additional undesired side effects (like allocations).
Otherwise, don't parallelize.
See below example of various implementations with long/short computations times and their resulting benchmark.
The conclusion is that unless you compute a long running asynchronous task that will clearly outweigh the synchronization costs, sequential processing is faster.
main.go
package main
import (
"bufio"
"fmt"
"io"
"runtime"
"strings"
"sync"
"time"
)
func main() {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
run_line_short(data, true)
run_line_long(data, true)
run_line_short_workers(data, true)
run_line_long_workers(data, true)
run_bulk_short(data, true)
run_bulk_long(data, true)
run_seq_short(data, true)
run_seq_long(data, true)
}
func run_line_short(data string, stat bool) {
if stat {
s := stats("run_line_short")
defer s()
}
r := strings.NewReader(data)
err := process(r, line_handler_short)
if err != nil {
panic(err)
}
}
func run_line_long(data string, stat bool) {
if stat {
s := stats("run_line_long")
defer s()
}
r := strings.NewReader(data)
err := process(r, line_handler_long)
if err != nil {
panic(err)
}
}
func run_line_short_workers(data string, stat bool) {
if stat {
s := stats("run_line_short_workers")
defer s()
}
r := strings.NewReader(data)
err := processWorkers(r, line_handler_short)
if err != nil {
panic(err)
}
}
func run_line_long_workers(data string, stat bool) {
if stat {
s := stats("run_line_long_workers")
defer s()
}
r := strings.NewReader(data)
err := processWorkers(r, line_handler_long)
if err != nil {
panic(err)
}
}
func run_bulk_short(data string, stat bool) {
if stat {
s := stats("run_bulk_short")
defer s()
}
r := strings.NewReader(data)
err := processBulk(r, bulk_handler_short)
if err != nil {
panic(err)
}
}
func run_bulk_long(data string, stat bool) {
if stat {
s := stats("run_bulk_long")
defer s()
}
r := strings.NewReader(data)
err := processBulk(r, bulk_handler_long)
if err != nil {
panic(err)
}
}
func run_seq_short(data string, stat bool) {
if stat {
s := stats("run_seq_short")
defer s()
}
r := strings.NewReader(data)
err := processSeq(r, line_handler_short)
if err != nil {
panic(err)
}
}
func run_seq_long(data string, stat bool) {
if stat {
s := stats("run_seq_long")
defer s()
}
r := strings.NewReader(data)
err := processSeq(r, line_handler_long)
if err != nil {
panic(err)
}
}
func line_handler_short(k string) error {
_ = len(k)
return nil
}
func line_handler_long(k string) error {
<-time.After(time.Millisecond * 5)
_ = len(k)
return nil
}
func bulk_handler_short(b []string) error {
for _, k := range b {
_ = len(k)
}
return nil
}
func bulk_handler_long(b []string) error {
<-time.After(time.Millisecond * 5)
for _, k := range b {
_ = len(k)
}
return nil
}
func stats(name string) func() {
fmt.Printf("======================\n")
fmt.Printf("%v\n", name)
start := time.Now()
return func() {
fmt.Printf("time to run %v\n", time.Since(start))
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MB\n",
ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024)
fmt.Printf("Mallocs: %d, Frees: %d\n",
ms.Mallocs, ms.Frees)
fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MB\n",
ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024)
fmt.Printf("HeapObjects: %d\n", ms.HeapObjects)
fmt.Printf("\n")
}
}
func process(r io.Reader, h func(string) error) error {
errs := make(chan error)
workers := make(chan struct{}, 4)
var wg sync.WaitGroup
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
workers <- struct{}{} // acquire a token
wg.Add(1)
go func(line string) {
defer wg.Done()
if err := h(line); err != nil {
errs <- err
}
<-workers
}(scanner.Text())
}
wg.Wait()
if err := scanner.Err(); err != nil {
errs <- err
}
close(errs)
}()
var err error
for e := range errs {
if e != nil && err == nil {
err = e
}
}
return err
}
func processWorkers(r io.Reader, h func(string) error) error {
errs := make(chan error)
input := make(chan string)
y := 4
var wg sync.WaitGroup
for i := 0; i < y; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for line := range input {
if err := h(line); err != nil {
errs <- err
}
}
}()
}
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
input <- scanner.Text()
}
close(input)
wg.Wait()
if err := scanner.Err(); err != nil {
errs <- err
}
close(errs)
}()
var err error
for e := range errs {
if err == nil && e != nil {
err = e
}
}
return err
}
func processBulk(r io.Reader, h func([]string) error) error {
errs := make(chan error)
input := make(chan []string)
y := 4
var wg sync.WaitGroup
for i := 0; i < y; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for bulk := range input {
if err := h(bulk); err != nil {
errs <- err
}
}
}()
}
go func() {
scanner := bufio.NewScanner(r)
l := 50
bulk := make([]string, l)
i := 0
for scanner.Scan() {
text := scanner.Text()
bulk[i] = text
i++
if i == l {
copied := make([]string, l, l)
copy(copied, bulk)
i = 0
input <- copied
}
}
if i > 0 {
input <- bulk[:i]
}
close(input)
if err := scanner.Err(); err != nil {
errs <- err
}
}()
go func() {
wg.Wait()
close(errs)
}()
var err error
for e := range errs {
if err == nil && e != nil {
err = e
}
}
return err
}
func processSeq(r io.Reader, h func(string) error) error {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
text := scanner.Text()
if err := h(text); err != nil {
return err
}
}
return scanner.Err()
}
main_test.go
package main
import (
"strings"
"testing"
)
func Benchmark_run_line_short(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_short(data, false)
}
}
func Benchmark_run_line_long(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_long(data, false)
}
}
func Benchmark_run_line_short_workers(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_short_workers(data, false)
}
}
func Benchmark_run_line_long_workers(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_line_long_workers(data, false)
}
}
func Benchmark_run_bulk_short(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_bulk_short(data, false)
}
}
func Benchmark_run_bulk_long(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_bulk_long(data, false)
}
}
func Benchmark_run_seq_short(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_seq_short(data, false)
}
}
func Benchmark_run_seq_long(b *testing.B) {
data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
for i := 0; i < b.N; i++ {
run_seq_long(data, false)
}
}
results
$ go run main.go
======================
run_line_short
time to run 2.747827ms
Alloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MB
Mallocs: 1378, Frees: 1
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1377
======================
run_line_long
time to run 1.30987804s
Alloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MB
Mallocs: 5619, Frees: 5
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5614
======================
run_line_short_workers
time to run 4.54926ms
Alloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MB
Mallocs: 6648, Frees: 5743
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 905
======================
run_line_long_workers
time to run 1.29874118s
Alloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MB
Mallocs: 10670, Frees: 5747
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MB
HeapObjects: 4923
======================
run_bulk_short
time to run 1.279059ms
Alloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MB
Mallocs: 11695, Frees: 5751
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5944
======================
run_bulk_long
time to run 31.328652ms
Alloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MB
Mallocs: 12728, Frees: 11361
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1367
======================
run_seq_short
time to run 956.991µs
Alloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MB
Mallocs: 13746, Frees: 11160
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 2586
======================
run_seq_long
time to run 5.195705859s
Alloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MB
Mallocs: 17766, Frees: 15973
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1793
[mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4
goos: linux
goarch: amd64
pkg: test/bulk
Benchmark_run_line_short-4 1000 1750824 ns/op 1029354 B/op 1005 allocs/op
Benchmark_run_line_short-4 1000 1747408 ns/op 1029348 B/op 1005 allocs/op
Benchmark_run_line_short-4 1000 1757826 ns/op 1029352 B/op 1005 allocs/op
Benchmark_run_line_short-4 1000 1758427 ns/op 1029352 B/op 1005 allocs/op
Benchmark_run_line_long-4 1 1303037704 ns/op 2253776 B/op 4075 allocs/op
Benchmark_run_line_long-4 1 1305074974 ns/op 2247792 B/op 4032 allocs/op
Benchmark_run_line_long-4 1 1305353658 ns/op 2246320 B/op 4013 allocs/op
Benchmark_run_line_long-4 1 1305725817 ns/op 2247792 B/op 4031 allocs/op
Benchmark_run_line_short_workers-4 1000 2148354 ns/op 1029366 B/op 1005 allocs/op
Benchmark_run_line_short_workers-4 1000 2139629 ns/op 1029370 B/op 1005 allocs/op
Benchmark_run_line_short_workers-4 1000 1983352 ns/op 1029359 B/op 1005 allocs/op
Benchmark_run_line_short_workers-4 1000 1909968 ns/op 1029363 B/op 1005 allocs/op
Benchmark_run_line_long_workers-4 1 1298321093 ns/op 2247856 B/op 4013 allocs/op
Benchmark_run_line_long_workers-4 1 1299846127 ns/op 2246384 B/op 4012 allocs/op
Benchmark_run_line_long_workers-4 1 1300003625 ns/op 2246288 B/op 4011 allocs/op
Benchmark_run_line_long_workers-4 1 1302779911 ns/op 2246256 B/op 4011 allocs/op
Benchmark_run_bulk_short-4 2000 704358 ns/op 1082154 B/op 1011 allocs/op
Benchmark_run_bulk_short-4 2000 708563 ns/op 1082147 B/op 1011 allocs/op
Benchmark_run_bulk_short-4 2000 714687 ns/op 1082148 B/op 1011 allocs/op
Benchmark_run_bulk_short-4 2000 705546 ns/op 1082156 B/op 1011 allocs/op
Benchmark_run_bulk_long-4 50 31411412 ns/op 1051497 B/op 1088 allocs/op
Benchmark_run_bulk_long-4 50 31513018 ns/op 1051544 B/op 1088 allocs/op
Benchmark_run_bulk_long-4 50 31539311 ns/op 1051502 B/op 1088 allocs/op
Benchmark_run_bulk_long-4 50 31564940 ns/op 1051505 B/op 1088 allocs/op
Benchmark_run_seq_short-4 2000 574346 ns/op 1028632 B/op 1002 allocs/op
Benchmark_run_seq_short-4 3000 572857 ns/op 1028464 B/op 1002 allocs/op
Benchmark_run_seq_short-4 2000 580493 ns/op 1028632 B/op 1002 allocs/op
Benchmark_run_seq_short-4 3000 572240 ns/op 1028464 B/op 1002 allocs/op
Benchmark_run_seq_long-4 1 5196313302 ns/op 2245792 B/op 4005 allocs/op
Benchmark_run_seq_long-4 1 5199995649 ns/op 2245792 B/op 4005 allocs/op
Benchmark_run_seq_long-4 1 5200460425 ns/op 2245792 B/op 4005 allocs/op
Benchmark_run_seq_long-4 1 5201080570 ns/op 2245792 B/op 4005 allocs/op
PASS
ok test/bulk 68.944s
notes: to my surprise, run_line_short_workers
is slightly slower than run_line_short
, i don't explain that result, however a deeper analysis using pprof should provide the answer.
All your example does currently is asynchronously printing the values -- and not even that as the print function has to synchronise printing to the output.
At the same time you are not printing all your lines in the file. The main routine starts a lot of goroutines, but it doesn't wait for them to finish. Some will run, and some will not. To wait for the routines to finish, use a sync.WaitGroup
.
Here is an example. It might also fix your file descriptor problem.
wg := &sync.WaitGroup{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
text := scanner.Text()
wg.Add(1)
go func(t string) {
fmt.Println(t)
wg.Done()
}(text)
}
wg.Wait()
Note that the lines will not be processed in order! If you need to process them in order, but do not want to process them in the goroutine reading, you need a channel and a single goroutine processing them.
This helped with the too many concurrent operations error. I use a channel to limit the number of concurrent reads to 100,000.
package main
import (
"bufio"
"fmt"
"log"
"os"
"time"
)
var tokens = make(chan struct{}, 100000)
func main() {
start := time.Now()
file, err := os.Open("dump10.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
tokens <- struct{}{} // acquire a token
text := scanner.Text()
go func () {
fmt.Printf("%s\n",text)
<-tokens
}()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
secs := time.Since(start).Seconds()
fmt.Printf("Took %.2fs", secs)
}