0%
March 2, 2025

Golang Simple yet Useful Knowledge

go

Read the list of files

files, err := os.ReadDir(dir)

Read a file

const FILES_DIR = "./files"
content, err := os.ReadFile(FILES_DIR + "/" + file.Name())
text := string(content)
words := strings.Fields(text) // this split the content string by /\s+/g

Await for all Goroutines by Unbuffered Channel Trick

Assume that we have the following project structure:

Let's dispatch 8 goroutines to count the number of words in each file:

Read a set of large files concurrently
func main() {
	start := time.Now()
	files := getFilesDirs(FILES_DIR)
	intChannel := make(chan int)

	for _, file := range files {
		go printCountOfWords(file, intChannel)
	}

	for i := 0; i < len(files); i++ {
		<-intChannel
	}

	elapsed := time.Since(start)
	fmt.Println("Time Taken:", elapsed)
}

func printCountOfWords(file fs.FileInfo, intChannel chan int) {
	content, err := os.ReadFile(FILES_DIR + "/" + file.Name())
	if err != nil {
		fmt.Println(err)
	}

	words := strings.Fields(string(content))
	numOfWords := len(words)
	fmt.Printf("%s has %d words \n", file.Name(), numOfWords)
	intChannel <- 0
}

Here is the trick:

  • By default make(chan int) creates an unbuffered channel. Which means that every receive-operation <-intChannel is blocking until a value is received.
  • Therefore each of <-intChannel's will be unblocked once a value is received, thus we can unblock all receivers by enough number of send operations intChannel <- 0.

Concurrency Limit by Buffered Channel and Await Coroutines by sync.WaitGroup

Download a set of large files
Structs for Decoding an XML

We define the following struct in an attemp to decode the XML file from this link:

type Rss struct {
	Channel Channel `xml:"channel"`
}

type Channel struct {
	Title       string `xml:"title"`
	Description string `xml:"description"`
	Link        string `xml:"link"`
	Items       []Item `xml:"item"`
}

type Item struct {
	Title       string    `xml:"title"`
	Description string    `xml:"description"`
	Enclosure   Enclosure `xml:"enclosure"`
}

type Enclosure struct {
	URL    string `xml:"url,attr"`
	Length int    `xml:"length,attr"`
	Type   string `xml:"type,attr"`
}
Main Program

First we define two constants:

const (
	maxDoanloads = 20
	maxRoutines  = 5
)

We will be downloading 20 audios by at most 5 tasks concurrently.

1func main() {
2	rssURL := "https://feeds.simplecast.com/qm_9xx0g"
3	res, err := http.Get(rssURL)
4	if err != nil {
5		fmt.Println("Error: ", err)
6		return
7	}
8	defer res.Body.Close()
9	var rss Rss
10	decoder := xml.NewDecoder(res.Body)
11	err = decoder.Decode(&rss)
12	if err != nil {
13		fmt.Println("Error decoding RSS feed:", err)
14		return
15	}
16
17	start := time.Now()
18
19	var wg sync.WaitGroup
20	semaphore := make(chan struct{}, maxRoutines)
  • Here we define a wait group wg, we will add a counter by wg.Add(1) right before we dispatch a coroutine.
  • We intentionally create a semaphore with empty struct struct{}, which therefore pre-allocates no memory to our channel since this is not a meaningful data type.
21	for index, item := range rss.Channel.Items {
22		if index >= maxDoanloads {
23			break
24		}
25		wg.Add(1)
26		go func(item Item) {
27			semaphore <- struct{}{}
28
29			defer wg.Done()
30			filename := item.Title + ".mp3"
31			fmt.Println("Downloading ...:", filename)
32			downloadPodcast(item.Enclosure.URL, filename)
33			fmt.Println("Download Complete:", filename)
34
35			<-semaphore
36		}(item)
37	}
38	wg.Wait()
  • Here we wg.Done() in coroutine to deduct the counter

  • We wg.Wait() to block the main thread from running

  • We use a pair of semaphore <- struct{}{} and <-semaphore to rate limit the operations. The send action semaphore <- is blocking when the capacity of the channel is full, resulting in a rate limit.

  • Note that here we cannot simply define the closure go func(){ ... }() with item being captured from the parent scope. It is because the order of execution of the closures is not determined by the sequential order we define it.

    If they refer to the same variable, then a race condition occurs since the value of the reference item (an auto-derefereneced pointer) is ever changing.

We wrap up by counting the duration of execution:

39	elapsed := time.Since(start)
40	fmt.Printf("This code took %s to run.", elapsed)
41}

Atomic Operations by Mutex Lock

We simply assign the target variable a lock by wrapping it into a new struct with mutex:

type AtomicTotalWordCount struct {
	lock  sync.Mutex
	count int
}

func (a *AtomicTotalWordCount) Add(count int) {
	a.lock.Lock()
	defer a.lock.Unlock()
	tmpTotalWordCount := a.count + count
	a.count = tmpTotalWordCount
}

Then each Add operation is atomic, no two threads can perform the same operation which causes dirty read and write.