Commit 1e92a032 authored by Jacob Vosmaer's avatar Jacob Vosmaer

Improve log messages

parent c805ae8c
...@@ -72,13 +72,13 @@ func processInner(conn redis.Conn) error { ...@@ -72,13 +72,13 @@ func processInner(conn redis.Conn) error {
dataStr := string(v.Data) dataStr := string(v.Data)
msg := strings.SplitN(dataStr, "=", 2) msg := strings.SplitN(dataStr, "=", 2)
if len(msg) != 2 { if len(msg) != 2 {
helper.LogError(nil, fmt.Errorf("Redis receive error: got an invalid notification: %q", dataStr)) helper.LogError(nil, fmt.Errorf("keywatcher: invalid notification: %q", dataStr))
continue continue
} }
key, value := msg[0], msg[1] key, value := msg[0], msg[1]
notifyChanWatchers(key, value) notifyChanWatchers(key, value)
case error: case error:
helper.LogError(nil, fmt.Errorf("Redis receive error: %s", v)) helper.LogError(nil, fmt.Errorf("keywatcher: pubsub receive: %v", v))
// Intermittent error, return nil so that it doesn't wait before reconnect // Intermittent error, return nil so that it doesn't wait before reconnect
return nil return nil
} }
...@@ -105,21 +105,18 @@ func dialPubSub(dialer redisDialerFunc) (redis.Conn, error) { ...@@ -105,21 +105,18 @@ func dialPubSub(dialer redisDialerFunc) (redis.Conn, error) {
// //
// NOTE: There Can Only Be One! // NOTE: There Can Only Be One!
func Process() { func Process() {
log.Print("Processing redis queue") log.Print("keywatcher: starting process loop")
for { for {
log.Println("Connecting to redis")
conn, err := dialPubSub(workerDialFunc) conn, err := dialPubSub(workerDialFunc)
if err != nil { if err != nil {
helper.LogError(nil, fmt.Errorf("Failed to connect to redis: %s", err)) helper.LogError(nil, fmt.Errorf("keywatcher: %v", err))
time.Sleep(redisReconnectTimeout.Duration()) time.Sleep(redisReconnectTimeout.Duration())
continue continue
} }
redisReconnectTimeout.Reset() redisReconnectTimeout.Reset()
if err = processInner(conn); err != nil { if err = processInner(conn); err != nil {
helper.LogError(nil, fmt.Errorf("Failed to process redis-queue: %s", err)) helper.LogError(nil, fmt.Errorf("keywatcher: process loop: %v", err))
} }
} }
} }
...@@ -187,7 +184,7 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) ...@@ -187,7 +184,7 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
currentValue, err := GetString(key) currentValue, err := GetString(key)
if err != nil { if err != nil {
return WatchKeyStatusNoChange, fmt.Errorf("Failed to get value from Redis: %#v", err) return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET: %v", err)
} }
if currentValue != value { if currentValue != value {
return WatchKeyStatusAlreadyChanged, nil return WatchKeyStatusAlreadyChanged, nil
...@@ -196,7 +193,7 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) ...@@ -196,7 +193,7 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
select { select {
case currentValue := <-kw.Chan: case currentValue := <-kw.Chan:
if currentValue == "" { if currentValue == "" {
return WatchKeyStatusNoChange, fmt.Errorf("Failed to get value from Redis") return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET failed")
} }
if currentValue == value { if currentValue == value {
return WatchKeyStatusNoChange, nil return WatchKeyStatusNoChange, nil
......
...@@ -154,20 +154,25 @@ func sentinelDialer(dopts []redis.DialOption, keepAlivePeriod time.Duration) red ...@@ -154,20 +154,25 @@ func sentinelDialer(dopts []redis.DialOption, keepAlivePeriod time.Duration) red
return nil, err return nil, err
} }
dopts = append(dopts, redis.DialNetDial(keepAliveDialer(keepAlivePeriod))) dopts = append(dopts, redis.DialNetDial(keepAliveDialer(keepAlivePeriod)))
return redis.Dial("tcp", address, dopts...) return redisDial("tcp", address, dopts...)
} }
} }
func defaultDialer(dopts []redis.DialOption, keepAlivePeriod time.Duration, url url.URL) redisDialerFunc { func defaultDialer(dopts []redis.DialOption, keepAlivePeriod time.Duration, url url.URL) redisDialerFunc {
return func() (redis.Conn, error) { return func() (redis.Conn, error) {
if url.Scheme == "unix" { if url.Scheme == "unix" {
return redis.Dial(url.Scheme, url.Path, dopts...) return redisDial(url.Scheme, url.Path, dopts...)
} }
dopts = append(dopts, redis.DialNetDial(keepAliveDialer(keepAlivePeriod))) dopts = append(dopts, redis.DialNetDial(keepAliveDialer(keepAlivePeriod)))
return redis.Dial(url.Scheme, url.Host, dopts...) return redisDial(url.Scheme, url.Host, dopts...)
} }
} }
func redisDial(network, address string, options ...redis.DialOption) (redis.Conn, error) {
log.Printf("redis: dialing %q, %q", network, address)
return redis.Dial(network, address, options...)
}
func countDialer(dialer redisDialerFunc) redisDialerFunc { func countDialer(dialer redisDialerFunc) redisDialerFunc {
return func() (redis.Conn, error) { return func() (redis.Conn, error) {
c, err := dialer() c, err := dialer()
...@@ -236,7 +241,7 @@ func Get() redis.Conn { ...@@ -236,7 +241,7 @@ func Get() redis.Conn {
func GetString(key string) (string, error) { func GetString(key string) (string, error) {
conn := Get() conn := Get()
if conn == nil { if conn == nil {
return "", fmt.Errorf("Not connected to redis") return "", fmt.Errorf("redis: could not get connection from pool")
} }
defer conn.Close() defer conn.Close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment