Let say, an array in which each element holds a random countdown value, so when its time ended, it needs to spawn a worker (i.e. go-routine) as soon as possible. Luckily, in Go, we can create a custom type variable and benefit from the go-routine itself. For this tutorial, we’re going to have an array that has an observer to monitor its values.

1
2
var theArray []string
theArray = append(theArray,"Hello, There!")

The idea, every time a value is added to the observed variable theArray, the observer could immediately take action in real-time, as long as the main thread has not ended yet.

Rather than using the default type []string, let’s make a custom type called observee so later we can freely extend the type.

1
2
3
4
5
6
7
type observee []string
//initialize
var theArray observee
//start the observer
theArray.observer()
//append array
theArray = append(theArray,"Hi! I'm the one to be observed")

After the theArray initialized, we then call the observer function to start observing, which extended from the observee type itself.

1
2
3
4
5
type observee []string
func (o *observee) observer(fn func(observee)){
    //Do something!!!
    fn(*o)
}

As you noticed that I’m using a pointer *o, feel free to remove the pointer and see what happened. If we run the code below, the observer will print and monitor the values of the theArray after its appended.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
    var theArray observee
    //empty values
    theArray.observer(func(o observee){
        fmt.Println(o)
    })
    //append 1
    theArray = append(theArray,"Hi! it's me")
    theArray.observer(func(o observee){
        fmt.Println(o)
    })
    //append 2
    theArray = append(theArray,"Hi! another me")
    theArray.observer(func(o observee){
        fmt.Println(o)
    })
}
1
2
3
[]
[Hi! it's me]
[Hi! it's me Hi! another me]

Furthermore, to make the observation happen in real-time we need to create the observer’s routine. In other words, the observer needs to have its lifespan and continuously giving information to its caller.

But before that, let’s create the worker first, theArray value will hold detail about when will the worker launched, and because all the workers need to finish together then it needs to be synchronized.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type work struct {
    wg *sync.WaitGroup
}

func (w *work) countdown(workerID int) string {
    // return "[worker id]-[time(sec) to launch]-[timestamp added]"
    return fmt.Sprintf("%d-%d-%d", workerID, rand.Intn(5), time.Now().UnixNano())
}
func (w *work) ready(v string) (launchID int, ok bool) {
    id, tx, ts := w.data(v)
    if id > 0 && time.Now().Sub(time.Unix(0, ts)).Seconds() > float64(tx) {
        launchID = id
        ok = true
    }
    return
}
func (w *work) launch(i int) {
    w.wg.Add(1)
    go func(i int, wg *sync.WaitGroup) {
        defer wg.Done()
        fmt.Printf("worker %d is launched\n", i)
    }(i, w.wg)
}
func (w *work) data(v string) (id int, tx int, ts int64) {
    if v == "" {
        return
    }
    s := strings.Split(v, "-")
    id, _ = strconv.Atoi(string(s[0]))
    tx, _ = strconv.Atoi(string(s[1]))
    ts, _ = strconv.ParseInt(string(s[2]), 10, 64)
    return
}

And for the observee we modified a little bit so it will loop through each element of theArray and send information to caller function such as the array index and the value,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type observee []string
func (o *observee) observer(fn func(int, string, *sync.Mutex)) *sync.Mutex {
    var mu sync.Mutex
    //Start a go routine
    go func(mu *sync.Mutex) {
        //Forever loop through each array element
        nextIdx := 0
        for {
            if x := *o; x != nil {
                fn(nextIdx, x[nextIdx], mu)
                nextIdx++
                if nextIdx == len(x) {
                    nextIdx = 0
                }
            }
        }
    }(&mu)
    return &mu
}

Ok then, let’s try it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
    var theArray observee
    var wg sync.WaitGroup
    worker := &work{
        wg: &wg,
    }
    //Observe and launch worker
    theArray.observer(func(i int, v string, mu *sync.Mutex) {
        if id, ok := worker.ready(v); ok {
            theArray[i] = ""
            worker.launch(id)
        }
    })
    //Append workers to array
    for i := 1; i <= 10; i++ {
        theArray = append(theArray, worker.countdown(i))
    }

    wg.Wait()
    time.Sleep(5 * time.Second)
}

If we run the code above, the observer will continuously try to launch a worker from theArray. However, running a go-routine without knowing how to stop is not a good idea. So, what if we create a more controllable observer? In which we can restart/kill the observer and modify the observe duration like a ticker.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type observee []string

func (o *observee) observer(fn func(int, string, *sync.Mutex), td time.Duration) (*sync.Mutex, *time.Ticker) {
    var mu sync.Mutex
    t := time.NewTicker(td)
    //Start a go routine
    go func(mu *sync.Mutex) {
        //Forever loop through each array element
        nextIdx := 0
        for {
            select {
            case <-t.C:
                if x := *o; x != nil {
                    fn(nextIdx, x[nextIdx], mu)
                    nextIdx++
                    if nextIdx == len(x) {
                        nextIdx = 0
                    }
                }
            }
        }
    }(&mu)

    return &mu, t
}

The code below will simulate how is the ticker stops the observer and then continued as we set manually.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
    var theArray observee
    var wg sync.WaitGroup
    worker := &work{
        wg: &wg,
    }

    _, t := theArray.observer(func(i int, v string, mu *sync.Mutex) {
        if id, ok := worker.ready(v); ok {
            theArray[i] = ""
            worker.launch(id)
        }
    }, 1*time.Nanosecond) //observing each element per 1 Nanosecond

    for i := 1; i <= 10; i++ {
        theArray = append(theArray, worker.countdown(i))
    }

    wg.Wait()

    time.Sleep(1 * time.Second)
    t.Stop()
    fmt.Println("Stop observer, and wait for 5 secs\nno worker will launched\n")
    time.Sleep(5 * time.Second)

    fmt.Println("Ticker reset, continue observing")
    t.Reset(1 * time.Nanosecond)

    time.Sleep(3 * time.Second)

}

Run the code,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
$ go run 
worker 8 is launched
worker 7 is launched
worker 10 is launched
worker 1 is launched
worker 5 is launched
worker 9 is launched
Stop observer, and wait for 5 secs
no worker will launched

Ticker reset, continue observing
worker 3 is launched
worker 2 is launched
worker 4 is launched
worker 6 is launched

Hey, wait a sec! If we could reset and resume the ticker, then it means that the go-routine wasn’t actually killed, right? the go-routine will be alive until the main thread is shutdown. Let’s modified the code so we can kill the observer before the main thread is ended.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type observee []string

func (o *observee) observer(fn func(int, string, *sync.Mutex), td time.Duration) (*sync.Mutex, chan<- struct{}) {
    var mu sync.Mutex
    done := make(chan struct{})
    //Start a go routine
    go func(mu *sync.Mutex) {
        //Forever loop through each array element
        nextIdx := 0
        for {
            select {
            case <-done:
                return
            case <-time.NewTicker(td).C:
                if x := *o; x != nil {
                    fn(nextIdx, x[nextIdx], mu)
                    nextIdx++
                    if nextIdx == len(x) {
                        nextIdx = 0
                    }
                }
            }
        }
    }(&mu)

    return &mu, done
}

So instead of using the ticker, we’re going to use a done channel to kill the observer,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func main() {
    var theArray observee
    var wg sync.WaitGroup
    worker := &work{
        wg: &wg,
    }

    _, t := theArray.observer(func(i int, v string, mu *sync.Mutex) {
        mu.Lock() //Lock the process so there will be no leftover
        if id, ok := worker.ready(v); ok {
            theArray[i] = ""
            worker.launch(id)
        }
        mu.Unlock()
    }, 1*time.Nanosecond) //observing each element per 1 Nanosecond

    for i := 1; i <= 10; i++ {
        theArray = append(theArray, worker.countdown(i))
    }

    wg.Wait()

    time.Sleep(1 * time.Second)
    t <- struct{}{}
    fmt.Println("Stop observer, and wait for 5 secs\nno worker will launched\n")
    time.Sleep(5 * time.Second)

    fmt.Println("Append another worker to array, there will be no observer to launch the worker")
    for i := 11; i <= 13; i++ {
        theArray = append(theArray, worker.countdown(i))
    }

    time.Sleep(5 * time.Second)

    fmt.Println("Failed to launch:")
    fmt.Println(theArray)

}

Run the code, as you see, the observer doesn’t launch any worker when the appending is resumed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
$ go run
worker 10 is launched
worker 8 is launched
worker 7 is launched
worker 9 is launched
worker 1 is launched
Stop observer, and wait for 5 secs
no worker will launched

Append another worker to array, there will be no observer to launch the worker
Failed to launch:
[ 2-2-1615865175011360000 3-2-1615865175011362000 4-4-1615865175011362000 5-1-1615865175011363000 6-3-1615865175011373000 11-4-1615865181014639000 12-1-1615865181014644000 13-2-1615865181014645000]

Ok, we’re not done yet, as you see that the observer is only listening to changes that occurred from the main thread, the func main(), What if the observer must listen to changes from multiple routines? Let’s give it a try.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {
    var theArray observee
    var wg sync.WaitGroup
    worker := &work{
        wg: &wg,
    }

    mu, t := theArray.observer(func(i int, v string, mu *sync.Mutex) {
        mu.Lock()
        if id, ok := worker.ready(v); ok {
            theArray[i] = ""
            worker.launch(id)
        }
        mu.Unlock()
    }, 1*time.Nanosecond) //observing each element per 1 Nanosecond

    for i := 1; i <= 10; i++ {
        go func(mu *sync.Mutex, i int) {
            mu.Lock()
            theArray = append(theArray, worker.countdown(i))
            mu.Unlock()
        }(mu, i)
    }

    wg.Wait()

    time.Sleep(5 * time.Second)
    t <- struct{}{}

}

Run the code I guess it went well too, as you see that I’m using mu.Lock() and mu.Unlock() before and after appending theArray feel free to comment those lines and see what happened.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$ go run 
worker 2 is launched
worker 6 is launched
worker 4 is launched
worker 9 is launched
worker 5 is launched
worker 3 is launched
worker 10 is launched
worker 7 is launched
worker 1 is launched
worker 8 is launched

Ok, that’s a wrap. If you have any questions, suggestions or if I made a mistake, don’t hesitate to poke me on Twitter or email.