package redis import ( "sync" "testing" "time" "github.com/gomodule/redigo/redis" "github.com/rafaeljusto/redigomock/v3" "github.com/stretchr/testify/require" ) const ( runnerKey = "runner:build_queue:10" ) func createSubscriptionMessage(key, data string) []interface{} { return []interface{}{ []byte("message"), []byte(key), []byte(data), } } func createSubscribeMessage(key string) []interface{} { return []interface{}{ []byte("subscribe"), []byte(key), []byte("1"), } } func createUnsubscribeMessage(key string) []interface{} { return []interface{}{ []byte("unsubscribe"), []byte(key), []byte("1"), } } func (kw *KeyWatcher) countSubscribers(key string) int { kw.mu.Lock() defer kw.mu.Unlock() return len(kw.subscribers[key]) } // Forces a run of the `Process` loop against a mock PubSubConn. func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) { psc := redigomock.NewConn() psc.ReceiveWait = true channel := channelPrefix + runnerKey psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel)) psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel)) psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value)) errC := make(chan error) go func() { errC <- kw.receivePubSubStream(psc) }() require.Eventually(t, func() bool { kw.mu.Lock() defer kw.mu.Unlock() return kw.conn != nil }, time.Second, time.Millisecond) close(ready) require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == numWatchers }, time.Second, time.Millisecond) close(psc.ReceiveNow) require.NoError(t, <-errC) } type keyChangeTestCase struct { desc string returnValue string isKeyMissing bool watchValue string processedValue string expectedStatus WatchKeyStatus timeout time.Duration } func TestKeyChangesInstantReturn(t *testing.T) { testCases := []keyChangeTestCase{ // WatchKeyStatusAlreadyChanged { desc: "sees change with key existing and changed", returnValue: "somethingelse", watchValue: "something", expectedStatus: WatchKeyStatusAlreadyChanged, timeout: time.Second, }, { desc: "sees change with key non-existing", isKeyMissing: true, watchValue: "something", processedValue: "somethingelse", expectedStatus: WatchKeyStatusAlreadyChanged, timeout: time.Second, }, // WatchKeyStatusTimeout { desc: "sees timeout with key existing and unchanged", returnValue: "something", watchValue: "something", expectedStatus: WatchKeyStatusTimeout, timeout: time.Millisecond, }, { desc: "sees timeout with key non-existing and unchanged", isKeyMissing: true, watchValue: "", expectedStatus: WatchKeyStatusTimeout, timeout: time.Millisecond, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { conn, td := setupMockPool() defer td() if tc.isKeyMissing { conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) } else { conn.Command("GET", runnerKey).Expect(tc.returnValue) } kw := NewKeyWatcher() defer kw.Shutdown() kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") }) } } func TestKeyChangesWhenWatching(t *testing.T) { testCases := []keyChangeTestCase{ // WatchKeyStatusSeenChange { desc: "sees change with key existing", returnValue: "something", watchValue: "something", processedValue: "somethingelse", expectedStatus: WatchKeyStatusSeenChange, }, { desc: "sees change with key non-existing, when watching empty value", isKeyMissing: true, watchValue: "", processedValue: "something", expectedStatus: WatchKeyStatusSeenChange, }, // WatchKeyStatusNoChange { desc: "sees no change with key existing", returnValue: "something", watchValue: "something", processedValue: "something", expectedStatus: WatchKeyStatusNoChange, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { conn, td := setupMockPool() defer td() if tc.isKeyMissing { conn.Command("GET", runnerKey).ExpectError(redis.ErrNil) } else { conn.Command("GET", runnerKey).Expect(tc.returnValue) } kw := NewKeyWatcher() defer kw.Shutdown() wg := &sync.WaitGroup{} wg.Add(1) ready := make(chan struct{}) go func() { defer wg.Done() <-ready val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") }() kw.processMessages(t, 1, tc.processedValue, ready) wg.Wait() }) } } func TestKeyChangesParallel(t *testing.T) { testCases := []keyChangeTestCase{ { desc: "massively parallel, sees change with key existing", returnValue: "something", watchValue: "something", processedValue: "somethingelse", expectedStatus: WatchKeyStatusSeenChange, }, { desc: "massively parallel, sees change with key existing, watching missing keys", isKeyMissing: true, watchValue: "", processedValue: "somethingelse", expectedStatus: WatchKeyStatusSeenChange, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { runTimes := 100 conn, td := setupMockPool() defer td() getCmd := conn.Command("GET", runnerKey) for i := 0; i < runTimes; i++ { if tc.isKeyMissing { getCmd = getCmd.ExpectError(redis.ErrNil) } else { getCmd = getCmd.Expect(tc.returnValue) } } wg := &sync.WaitGroup{} wg.Add(runTimes) ready := make(chan struct{}) kw := NewKeyWatcher() defer kw.Shutdown() for i := 0; i < runTimes; i++ { go func() { defer wg.Done() <-ready val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, tc.expectedStatus, val, "Expected value") }() } kw.processMessages(t, runTimes, tc.processedValue, ready) wg.Wait() }) } } func TestShutdown(t *testing.T) { conn, td := setupMockPool() defer td() kw := NewKeyWatcher() kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()} defer kw.Shutdown() conn.Command("GET", runnerKey).Expect("something") wg := &sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() val, err := kw.WatchKey(runnerKey, "something", 10*time.Second) require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change") }() go func() { defer wg.Done() require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond) kw.Shutdown() }() wg.Wait() require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond) // Adding a key after the shutdown should result in an immediate response var val WatchKeyStatus var err error done := make(chan struct{}) go func() { val, err = kw.WatchKey(runnerKey, "something", 10*time.Second) close(done) }() select { case <-done: require.NoError(t, err, "Expected no error") require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change") case <-time.After(100 * time.Millisecond): t.Fatal("timeout waiting for WatchKey") } }