Read the list of files
files, err := os.ReadDir(dir)
Read a file
const FILES_DIR = "./files"
content, err := os.ReadFile(FILES_DIR + "/" + file.Name()) text := string(content) words := strings.Fields(text) // this split the content string by /\s+/g
Await for all Goroutines by Unbuffered Channel Trick
Assume that we have the following project structure:
Let's dispatch 8 goroutines to count the number of words in each file:
Read a set of large files concurrently
func main() { start := time.Now() files := getFilesDirs(FILES_DIR) intChannel := make(chan int) for _, file := range files { go printCountOfWords(file, intChannel) } for i := 0; i < len(files); i++ { <-intChannel } elapsed := time.Since(start) fmt.Println("Time Taken:", elapsed) } func printCountOfWords(file fs.FileInfo, intChannel chan int) { content, err := os.ReadFile(FILES_DIR + "/" + file.Name()) if err != nil { fmt.Println(err) } words := strings.Fields(string(content)) numOfWords := len(words) fmt.Printf("%s has %d words \n", file.Name(), numOfWords) intChannel <- 0 }
Here is the trick:
- By default
make(chan int)
creates an unbuffered channel. Which means that every receive-operation<-intChannel
is blocking until a value is received. - Therefore each of
<-intChannel
's will be unblocked once a value is received, thus we can unblock all receivers by enough number of send operationsintChannel <- 0
.
Concurrency Limit by Buffered Channel and Await Coroutines by sync.WaitGroup
Download a set of large files
Structs for Decoding an XML
We define the following struct
in an attemp to decode the XML file from this link:
type Rss struct { Channel Channel `xml:"channel"` } type Channel struct { Title string `xml:"title"` Description string `xml:"description"` Link string `xml:"link"` Items []Item `xml:"item"` } type Item struct { Title string `xml:"title"` Description string `xml:"description"` Enclosure Enclosure `xml:"enclosure"` } type Enclosure struct { URL string `xml:"url,attr"` Length int `xml:"length,attr"` Type string `xml:"type,attr"` }
Main Program
First we define two constants:
const ( maxDoanloads = 20 maxRoutines = 5 )
We will be downloading 20 audios by at most 5 tasks concurrently.
1func main() { 2 rssURL := "https://feeds.simplecast.com/qm_9xx0g" 3 res, err := http.Get(rssURL) 4 if err != nil { 5 fmt.Println("Error: ", err) 6 return 7 } 8 defer res.Body.Close() 9 var rss Rss 10 decoder := xml.NewDecoder(res.Body) 11 err = decoder.Decode(&rss) 12 if err != nil { 13 fmt.Println("Error decoding RSS feed:", err) 14 return 15 } 16 17 start := time.Now() 18 19 var wg sync.WaitGroup 20 semaphore := make(chan struct{}, maxRoutines)
- Here we define a wait group
wg
, we will add a counter bywg.Add(1)
right before we dispatch a coroutine. - We intentionally create a
semaphore
with empty structstruct{}
, which therefore pre-allocates no memory to our channel since this is not a meaningful data type.
21 for index, item := range rss.Channel.Items { 22 if index >= maxDoanloads { 23 break 24 } 25 wg.Add(1) 26 go func(item Item) { 27 semaphore <- struct{}{} 28 29 defer wg.Done() 30 filename := item.Title + ".mp3" 31 fmt.Println("Downloading ...:", filename) 32 downloadPodcast(item.Enclosure.URL, filename) 33 fmt.Println("Download Complete:", filename) 34 35 <-semaphore 36 }(item) 37 } 38 wg.Wait()
-
Here we
wg.Done()
in coroutine to deduct the counter -
We
wg.Wait()
to block the main thread from running -
We use a pair of
semaphore <- struct{}{}
and<-semaphore
to rate limit the operations. The send actionsemaphore <-
is blocking when the capacity of the channel is full, resulting in a rate limit. -
Note that here we cannot simply define the closure
go func(){ ... }()
withitem
being captured from the parent scope. It is because the order of execution of the closures is not determined by the sequential order we define it.If they refer to the same variable, then a race condition occurs since the value of the reference
item
(an auto-derefereneced pointer) is ever changing.
We wrap up by counting the duration of execution:
39 elapsed := time.Since(start) 40 fmt.Printf("This code took %s to run.", elapsed) 41}
Atomic Operations by Mutex Lock
We simply assign the target variable a lock by wrapping it into a new struct with mutex:
type AtomicTotalWordCount struct { lock sync.Mutex count int } func (a *AtomicTotalWordCount) Add(count int) { a.lock.Lock() defer a.lock.Unlock() tmpTotalWordCount := a.count + count a.count = tmpTotalWordCount }
Then each Add
operation is atomic, no two threads can perform the same operation which causes dirty read and write.