preface

This article documents the basic operation of Redis in GO language. There are two common operation libraries: Redigo and Go-redis. Currently, Go-redis has more stars on Github, so choose the latter and record the use of its basic command in Go.

The installation

go get github.com/go-redis/redis/v8
Copy the code

Client connection

Standalone connection NewClient

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClient(&redis.Options{
      Username: "",
      Password: "".// default: localhost:6379
      Addr: "localhost:6381",
      DB: 1,
      PoolSize: 5,})// Ping to check the connection
  pingResult, err := rdb.Ping(ctx).Result()
  iferr ! =nil {
      log.Fatal(err)
  }
  // PONG
  fmt.Println(pingResult)
}
Copy the code

Cluster connection NewClusterClient

The connection is in cluster mode

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClusterClient(&redis.ClusterOptions{
      Username: "",
      Password: "",
      Addrs:    []string{": 6381".": 6379"},
      PoolSize: 20,
  })
  pingResult, err := rdb.Ping(ctx).Result()
  iferr ! =nil {
      log.Fatal(err)
  }
  // PONG
  fmt.Println(pingResult)
}
Copy the code

Manually create a cluster connection

You can also create cluster connections manually in standalone mode

func main(a) {
  ctx := context.Background()
  clusterSlots := func(ctx context.Context) ([]redis.ClusterSlot, error) {
      slots := []redis.ClusterSlot{
          // First master:slave node
          {
              Start: 0,
              End:   8191,
              Nodes: []redis.ClusterNode{{
                  Addr: ": 7000".// master
              }, {
                  Addr: ": 8000".// slave}}},// The second master:slave node
          {
              Start: 8192,
              End:   16383,
              Nodes: []redis.ClusterNode{{
                  Addr: ": 7001".// master
              }, {
                  Addr: ": 8001".// slave}}},}return slots, nil
  }

  rdb := redis.NewClusterClient(&redis.ClusterOptions{
      ClusterSlots:  clusterSlots,
      RouteRandomly: true,
  })
  rdb.Ping(ctx)
}
Copy the code

Sentinel connection NewFailoverClient

Concurrent secure connections

func main(a) {
  ctx := context.Background()
  rdb := redis.NewFailoverClient(&redis.FailoverOptions{
      Username: "",
      Password: "",
      DB: 0,
      MasterName: "master",
      SentinelAddrs: []string{": 2378"},
      SentinelPassword: "",
  })
  rdb.Ping(ctx)
}
Copy the code

Shard connection NewRing

func main(a) {
  ctx := context.Background()
  rdb := redis.NewRing(&redis.RingOptions{
      Username: "",
      Password: "",
      DB:       0,
      PoolSize: 10,
      Addrs: map[string]string{
          "shard1": ": 7000"."shard2": ": 7001"."shard3": ": 7002",
      },
  })
  rdb.Ping(ctx)
}
Copy the code

Universal connection NewUniversalClient

Return different clients according to different options passed

package main

import (
	"github.com/go-redis/redis/v8"
)

func main(a) {
  rdb1 := redis.NewUniversalClient(&redis.UniversalOptions{
      // If the adDRS slice length is greater than or equal to 2, a ClusterClient will be returned
      Addrs: []string{": 7000".": 7001".": 7002".": 7003".": 7004".": 7005"}})defer rdb1.Close()

  rdb2 := redis.NewUniversalClient(&redis.UniversalOptions{
      // The MasterName parameter is passed and a FailoverClient based on Sentinel is returned
      MasterName: "master",
      Addrs:      []string{": 26379"}})defer rdb2.Close()

  rdb3 := redis.NewUniversalClient(&redis.UniversalOptions{
      // The addrs slice length is 1 and will return a normal single-node client NewClient
      Addrs: []string{": 6379"}})defer rdb3.Close()
}
Copy the code

Parse the URL to connect

package main

import (
	"context"

	"github.com/go-redis/redis/v8"
)

func main(a) {
  options, err := redis.ParseURL("redis://username:password@localhost:6379/1")
  iferr ! =nil {
      panic(err)
  }

  rdb := redis.NewClient(options)
  defer rdb.Close()

  rdb.Ping(context.Background())
}
Copy the code

Basic operation

String

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-redis/redis/v8"
)

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",
  })

  statusCmd := rdb.Set(ctx, "language"."golang".5*time.Second)
  fmt.Println(statusCmd.String())
  // set language golang ex 5: OK
  fmt.Println(statusCmd.Result())
  // OK <nil>

  stringCmd := rdb.Get(ctx, "language")
  fmt.Println(stringCmd.String())
  // get language: golang
  fmt.Println(stringCmd.Result())
  // golang <nil>

  stringCmd2 := rdb.GetSet(ctx, "language"."php")
  fmt.Println(stringCmd2.String())
  // getset language php: golang
  fmt.Println(stringCmd2.Result())
  // golang <nil>


  boolCmd := rdb.SetNX(ctx, "language"."go".5*time.Second)
  fmt.Println(boolCmd.Result())
  // OK <nil>

  intCmd := rdb.StrLen(ctx,"language")
  fmt.Println(intCmd.Result())
  // 3 <nil>

  intCmd = rdb.Append(ctx, "language"."is the best")
  fmt.Println(intCmd.Result())
  // 14 <nil>
  str, _ := rdb.Get(ctx, "language").Result()
  fmt.Println(len(str))
  / / 14

  // statusCmd2 := rdb.MSet(ctx, []interface{}{"php", "world best", "go", 666})
  // statusCmd2 := rdb.MSet(ctx, map[string]interface{}{"php": "world best", "go": 666})
  statusCmd2 := rdb.MSet(ctx, "php"."world best"."go".Awesome!) // Three ways
  fmt.Println(statusCmd2.Result())
  // OK <nil>

  sliceCmd := rdb.MGet(ctx, "php"."go")
  fmt.Println(sliceCmd.Result())
  // [world best 666] <nil>

  intCmd2 := rdb.Incr(ctx,"go")
  fmt.Println(intCmd2.Result())
  // 667 <nil>

  intCmd = rdb.Decr(ctx,"go")
  fmt.Println(intCmd.Result())
  // 666 <nil>

  intCmd3 := rdb.IncrBy(ctx, "go".333)
  fmt.Println(intCmd3.Result())
  // 999 <nil>

  intCmd3 = rdb.DecrBy(ctx, "go".333)
  fmt.Println(intCmd3.Result())
  // 666 <nil>

  floatCmd := rdb.IncrByFloat(ctx, "go".0.666)
  fmt.Println(floatCmd.Result())
  / / 666.666 < nil >
}
Copy the code

List

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/go-redis/redis/v8"
)

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",})// Add a value to an existing list, return length 0 if it does not exist, and push will not succeed or print an error
  intCmd := rdb.RPushX(ctx, "not:exists:list"."a"."b"."c")
  length, err := intCmd.Result()
  iferr ! =nil {
      log.Fatal(err)
  }
  fmt.Println(length, err)
  // 0 <nil>

  // Add elements to the list, whether the list exists or not
  intCmd = rdb.RPush(ctx, "example:list"."a"."b"."c")
  length, err = intCmd.Result()
  fmt.Println(length, err)
  // 3 <nil>

  // Remove the last element of the list
  strCmd := rdb.RPop(ctx, "example:list")
  result, err := strCmd.Result()
  fmt.Println(result, err)
  // c <nil>

  // Remove the last element of the list, add that element to another list and return
  strCmd = rdb.RPopLPush(ctx, "example:list"."example:list2")
  result, err = strCmd.Result()
  fmt.Println(result, err)
  // b <nil>

  // List the length
  intCmd = rdb.LLen(ctx, "example:list2")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Lists the values of an index location
  strCmd = rdb.LIndex(ctx, "example:list2".0)
  fmt.Println(strCmd.Result())
  // b <nil>

  // Lists the values of a range index
  strSliceCmd := rdb.LRange(ctx, "example:list2".0.- 1)
  fmt.Println(strSliceCmd.Result())
  // [b] <nil>

  // Pops the last element, if not, blocks the list until the wait times out or a popable element is found.
  If there is no value in the first key, pop from the second key
  strSliceCmd = rdb.BRPop(ctx, 5*time.Second, "example:list"."example:list2")
  var results []string
  results, err = strSliceCmd.Result()
  fmt.Println(results, err)
  // [example:list a] <nil>

  // Insert a before B in the list, and return the inserted list element
  intCmd = rdb.LInsert(ctx, "example:list2"."before"."b"."a")
  / / equivalent to the
  // intCmd = rdb.LInsertBefore(ctx, "example:list2", "b", "a")
  fmt.Println(intCmd.Result())
  // 2 <nil>
  fmt.Println(rdb.LRange(ctx, "example:list2".0.- 1).Result())
  // [a b] <nil>

  // Set the value of the list element by index
  statusCmd := rdb.LSet(ctx, "example:list2".0."c")
  fmt.Println(statusCmd.Result())
  // OK <nil>

  // How many removed elements, 0 represents all removed elements, returns the number of removed elements
  intCmd = rdb.LRem(ctx, "example:list2".0."c")
  fmt.Println(intCmd.Result())
  // 1 <nil>
}
Copy the code

Set

package main

import (
	"context"
	"fmt"

	"github.com/go-redis/redis/v8"
)

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",})// Add elements to the collection, return the number of successfully added elements
  intCmd := rdb.SAdd(ctx, "example:set1"."s1"."s2"."s3")
  intCmd = rdb.SAdd(ctx, "example:set2"."e1"."e2"."e3"."s1")
  fmt.Println(intCmd.Result())
  // 4 <nil>

  // The difference set returns the elements in set1 that are different from set2
  strSliceCmd := rdb.SDiff(ctx, "example:set1"."example:set2")
  fmt.Println(strSliceCmd.Result())
  // [s2 s3] <nil>

  / / intersection
  strSliceCmd = rdb.SInter(ctx, "example:set1"."example:set2")
  fmt.Println(strSliceCmd.Result())
  // [s1] <nil>

  / / and set
  strSliceCmd = rdb.SUnion(ctx, "example:set1"."example:set2")
  fmt.Println(strSliceCmd.Result())
  // [s3 s1 e3 e1 s2 e2] <nil>

  // Store the intersections into the storeInter collection and return the number of intersections
  intCmd = rdb.SInterStore(ctx, "storeInter"."example:set1"."example:set2")
  fmt.Println(rdb.SMembers(ctx, "storeInter").Result())
  // [s1] <nil>
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Store the difference set into the storeDiff set and return the number of difference sets
  intCmd = rdb.SDiffStore(ctx, "storeDiff"."example:set1"."example:set2")
  fmt.Println(rdb.SMembers(ctx, "storeDiff").Result())
  // [s2 s3] <nil>
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Store the union into the storeUnion collection and return the number of unions
  intCmd = rdb.SUnionStore(ctx, "storeUnion"."example:set1"."example:set2")
  fmt.Println(rdb.SMembers(ctx, "storeUnion").Result())
  // [s3 s1 e3 e1 s2 e2] <nil>
  fmt.Println(intCmd.Result())
  // 6 <nil>

  // Check whether it is an element of the set
  boolCmd := rdb.SIsMember(ctx,"example:set1"."s1")
  fmt.Println(boolCmd.Result())
  // true <nil>

  // Get the number of member sets
  intCmd = rdb.SCard(ctx, "example:set1")
  fmt.Println(intCmd.Result())
  // 3 <nil>

  // Get all members
  strSliceCmd = rdb.SMembers(ctx, "example:set1")
  fmt.Println(strSliceCmd.Result())
  //[s2 s3 s1] <nil>

  // Get 1 random member
  strCmd := rdb.SRandMember(ctx, "example:set1")
  fmt.Println(strCmd.Result())
  // s2 <nil>

  // Get N random members
  strSliceCmd = rdb.SRandMemberN(ctx, "example:set1".2)
  fmt.Println(strSliceCmd.Result())
  // [s2 s3] <nil>

  // Remove and return a random member
  strCmd = rdb.SPop(ctx, "example:set1")
  fmt.Println(strCmd.Result())
  // s1 <nil>

  // The number of successfully removed members is returned
  intCmd = rdb.SRem(ctx, "example:set1"."s1"."s2")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  fmt.Println(rdb.SMembers(ctx,"example:set2").Result())
  // [e3 e1 s1 e2] <nil>
  // Scan the return value to match e*
  scanCmd := rdb.SScan(ctx, "example:set2".0 , "e*".1)
  var res []string
  res, cursor, err := scanCmd.Result()
  fmt.Println(res, cursor, err)
  // [e3] 4 <nil> 

  // Continue scanning from the last cursor
  scanCmd = rdb.SScan(ctx, "example:set2", cursor, "e*".1)
  fmt.Println(scanCmd.Result())
  // [e1] 2 <nil>
}
Copy the code

SortedSet

package main

import (
	"context"
	"fmt"

	"github.com/go-redis/redis/v8"
)

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",})// Add elements to the ordered collection and return the number of successful additions
  intCmd := rdb.ZAdd(ctx, "sorted:set1", &redis.Z{Score: 10, Member: "s1"}, &redis.Z{Score: 20, Member: "s2"})
  intCmd = rdb.ZAdd(ctx, "sorted:set2", &redis.Z{Score: 10, Member: "e1"}, &redis.Z{Score: 20, Member: "e2"})
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Get the number of elements in the ordered set
  intCmd = rdb.ZCard(ctx, "sorted:set1")
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Get the member score value
  floatCmd := rdb.ZScore(ctx, "sorted:set1"."s2")
  fmt.Println(floatCmd.Result())
  // 20 <nil>

  // Get the number of elements in the ordered set
  intCmd = rdb.ZCount(ctx, "sorted:set1"."15"."30")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Gets the number of members in the specified dictionary range, including s1 and all subsequent members
  intCmd = rdb.ZLexCount(ctx, "sorted:set1"."[s1"."-")
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Gets the member of the specified index range
  strSliceCmd := rdb.ZRange(ctx, "sorted:set1".0.- 1)
  fmt.Println(strSliceCmd.Result())
  // [s1 s2] <nil>

  // Gets the member with a sort score in the specified index range
  zSliceCmd := rdb.ZRangeWithScores(ctx, "sorted:set1".0.- 1)
  fmt.Println(zSliceCmd.Result())
  // [{10 s1} {20 s2}] <nil>

  // Gets the members of the specified score range
  strSliceCmd = rdb.ZRangeByScore(ctx, "sorted:set1", &redis.ZRangeBy{Max: "10"})
  fmt.Println(strSliceCmd.Result())
  // [s1] <nil>

  // Returns the members of the ordered collection through the dictionary range. Performs interval operations directly on member values, (-> not included, [-> included)
  strSliceCmd = rdb.ZRangeByLex(ctx, "sorted:set1", &redis.ZRangeBy{Min: "-", Max: "(s2"})
  fmt.Println(strSliceCmd.Result())
  // [s1] <nil>
  strSliceCmd = rdb.ZRangeByLex(ctx, "sorted:set1", &redis.ZRangeBy{Min: "-", Max: "[s2"})
  fmt.Println(strSliceCmd.Result())
  // [s1 s2] <nil>
  strSliceCmd = rdb.ZRangeByLex(ctx, "sorted:set1", &redis.ZRangeBy{Min: "(s1", Max: "[s2"})
  fmt.Println(strSliceCmd.Result())
  // [s2] <nil>
  strSliceCmd = rdb.ZRangeByLex(ctx, "sorted:set1", &redis.ZRangeBy{Min: "[s1", Max: "[s2"})
  fmt.Println(strSliceCmd.Result())
  // [s1 s2] <nil>

  // Gets the index of the specified member
  intCmd = rdb.ZRank(ctx, "sorted:set1"."s2")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Increases the score of the specified member
  floatCmd = rdb.ZIncr(ctx, "sorted:set1", &redis.Z{Score: 40, Member: "s2"})
  fmt.Println(floatCmd.Result())
  // 60 <nil>

  // Increases the score of the specified member
  floatCmd = rdb.ZIncrBy(ctx, "sorted:set1".0.5."s1")
  fmt.Println(floatCmd.Result())
  / / 10.5 < nil >

  // Store the intersection in a new ordered collection
  intCmd = rdb.ZInterStore(ctx, "zStoreInter", &redis.ZStore{Keys: []string{"sorted:set1"."sorted:set2"}})
  fmt.Println(intCmd.Result())
  // 0 <nil>
  fmt.Println(rdb.ZRange(ctx,"zStoreInter".0 ,- 1).Result())
  // [] <nil>

  // Store the union in a new ordered set
  intCmd = rdb.ZUnionStore(ctx, "zStoreUnion", &redis.ZStore{Keys: []string{"sorted:set1"."sorted:set2"}})
  fmt.Println(intCmd.Result())
  // 4 <nil>
  fmt.Println(rdb.ZRange(ctx,"zStoreUnion".0 ,- 1).Result())
  // [e1 s1 e2 s2] <nil>

  // Returns the members of the index range, sorted by score from highest to lowest
  strSliceCmd = rdb.ZRevRange(ctx, "zStoreUnion".0.- 1)
  fmt.Println(strSliceCmd.Result())
  // [s2 e2 s1 e1] <nil>

  // Returns the members of the specified score range, sorted by score from highest to lowest
  strSliceCmd =  rdb.ZRevRangeByScore(ctx, "zStoreUnion", &redis.ZRangeBy{Max: "20"})
  fmt.Println(strSliceCmd.Result())
  // [e2 s1 e1] <nil>

  // Returns the ranking of the specified members in order of the highest to lowest score
  intCmd = rdb.ZRevRank(ctx, "zStoreUnion"."s1")
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Remove the member
  intCmd = rdb.ZRem(ctx, "zStoreUnion"."s1")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Remove a member from the specified segment
  intCmd = rdb.ZRemRangeByScore(ctx, "zStoreUnion"."0"."25")
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Remove a member from the specified ranking range
  intCmd = rdb.ZRemRangeByRank(ctx, "zStoreUnion".1.1)
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Removes a member from the specified dictionary range
  intCmd = rdb.ZRemRangeByLex(ctx, "zStoreUnion"."[e1"."(e2")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Scan the return value to match e*
  scanCmd := rdb.ZScan(ctx, "sorted:set2".0 , "e*".1)
  var res []string
  res, cursor, err := scanCmd.Result()
  fmt.Println(res, cursor, err)
  // [e1 10 e2 20] 0 <nil>

  // Continue scanning from the last cursor
  scanCmd = rdb.ZScan(ctx, "sorted:set2", cursor, "e*".1)
  fmt.Println(scanCmd.Result())
  // [e1 10 e2 20] 0 <nil>
}
Copy the code

Hash

package main

import (
	"context"
	"fmt"

	"github.com/go-redis/redis/v8"
)

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",})// Set the hash field:value. The three methods are equivalent
  // HMSet() and HMGet() are deprecated in redis3
  intCmd := rdb.HSet(ctx, "example:hash1"."name"."Zhang"."role"."Outlaw.")
  intCmd = rdb.HSet(ctx, "example:hash2".map[string]interface{} {"name": "Bill"."role": "?????"})
  intCmd = rdb.HSet(ctx, "example:hash3"And []string{"name"."Fifty"."role"."!!!"})
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Set field:value only if the field does not exist
  boolCmd := rdb.HSetNX(ctx, "example:hash1"."age".16)
  fmt.Println(boolCmd.Result())
  // true <nil>

  // Get the number of hash fields
  intCmd = rdb.HLen(ctx, "example:hash1")
  fmt.Println(intCmd.Result())
  // 3 <nil>

  // Gets the value of the specified field
  strCmd := rdb.HGet(ctx, "example:hash1"."role")
  fmt.Println(strCmd.Result())
  // 
      

  // Get the hash field
  strSliceCmd := rdb.HKeys(ctx, "example:hash1")
  fmt.Println(strSliceCmd.Result())
  // [name role age] <nil>

  // Get the hash field value
  strSliceCmd = rdb.HVals(ctx, "example:hash1")
  fmt.Println(strSliceCmd.Result())
  // 
      

  // Get all fields and values
  strStrMapCmd := rdb.HGetAll(ctx, "example:hash1")
  fmt.Println(strStrMapCmd.Result())
  // map[age:16 name: role: outlaw] 
      

  // Check whether the field exists
  boolCmd = rdb.HExists(ctx, "example:hash1"."name")
  fmt.Println(boolCmd.Result())
  // true <nil>

  // Is the added value of the field
  intCmd = rdb.HIncrBy(ctx, "example:hash1"."age".2)
  fmt.Println(intCmd.Result())
  // 18 <nil>

  // Add a floating point value to the field
  floatCmd := rdb.HIncrByFloat(ctx, "example:hash1"."age".0.5)
  fmt.Println(floatCmd.Result())
  / / 18.5 < nil >

  // Delete the field
  intCmd = rdb.HDel(ctx, "example:hash1"."name"."role"."age")
  fmt.Println(intCmd.Result())
  // 3 <nil>

  // Iteratively scan hash key-value pairs
  scanCmd := rdb.HScan(ctx, "example:hash2".0."name".1)
  fmt.Println(scanCmd.Result())
  // [name] 0 
      
}
Copy the code

Stream

See message queue below.

The message queue

Introduction of the Stream

In daily development, we can implement a simple message queue using Redis’s List type and call its blocking or non-blocking API, but this approach is often limited to simple functional requirements. A new data type, Stream, introduced in Redis 5.0, emulates a log-like data structure in memory in an abstract manner, an add-on only data structure. Just like opening a log file in memory in append mode, you can only keep appending content. Stream is primarily used for message queues, allowing consumers to block while producers send new messages into the Stream, and creating consumer groups. Because Stream is a log-like data structure, historical messages can be recorded during message delivery to ensure that messages are not lost.

The Stream message queue looks like this:

  • eachStreamThey all have a name, which we set by commandREDIS KEY.
  • throughXADDCan be specifiedStreamAppends a new message entry. A Stream message entry is not a simple string, but structured data consisting of one or more key-value pairs.
  • The ID structure of each message consists of the server’s millisecond timestamp at the time the ID was generated, plus a monotonically increasing ordinal number. like<millisecondsTime>-<sequenceNumber>.by virtue of the structure of ID,StreamSupports range query by id and time.
  • StreamFlow and congestionListThere are differences in behavior as follows:
    • By default, each new message entry in Stream is distributed to each waiting consumer. Each consumer in the List gets a different element.
    • All messages in the Stream are appended to the Stream indefinitely (unless the user explicitly requests that the entry be removed), meaning that the message is not deleted in the Stream, and the ID (last_deliver_id) of the last message read is recorded by different consumer groups. In a blocking List, a message is dropped from the List, effectively deleted.
  • Consumer groups fetch data from the stream to serve multiple consumers. The ability to explicitly acknowledge processed messages, examine pending messages, and declare unprocessed messages. The consumer group has the following functions:
    • Each message is provided to a different consumer, and the same message cannot be delivered to multiple consumers.
    • Consumers have unique name identifiers in consumer groups that are created automatically when they are first used and do not need to be explicitly created.
    • Each consumer group records the ID of the last message retrieved, and when a consumer requests a new message, only messages greater than the current ID are provided. Any consumer that reads the message makes the cursorlast_deliver_idMove back.
    • When the message is delivered to the consumer, it is not confirmed as processed. Consuming messages require explicit validation using specific commandsACK. Pending_ids records that it has been read, but not yetACKMessage ID of.
  • Consumer groups can read messages from multiple streams at the same time, but they must have the same name in each Stream.
  • Each message has a counter, only when calledXREADGROUPRead, callXCLAIMIt only increases by 1 when you change ownership.

Common commands

Message queue related commands:

  • XADD – Adds the message to the end
  • XTRIM – Trim by convection, limiting length
  • XDEL – Deletes the message
  • XLEN – Gets the number of elements contained in the stream, that is, the length of the message
  • XRANGE – Gets a list of messages and automatically filters deleted messages
  • XREVRANGE – Retrieves a list of messages in reverse, with ids from large to small
  • XREAD – Gets a list of messages in blocking or non-blocking mode

Consumer Group related commands:

  • XGROUP CREATE – Creates a consumer group
  • XREADGROUP GROUP – Reads messages in consumer groups
  • XACK – Marks the message as “processed”
  • XGROUP SETID – Sets the new final delivery message ID for the consumer group
  • XGROUP DELCONSUMER – Deletes consumer
  • XGROUP DESTROY – Deletes consumer groups
  • XPENDING – Displays information about pending messages
  • XCLAIM – Transfer the attribution of a message
  • XINFO – View information about streams and consumer groups;
  • XINFO GROUPS – Prints information about consumer GROUPS;
  • XINFO STREAM – Prints STREAM information

Go in operation

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/go-redis/redis/v8"
)

func main(a) {
  ctx := context.Background()
  rdb := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",})// Add a message to stream and return the message ID
  xAddArgs := &redis.XAddArgs{
      Stream: "example:stream",
      MaxLen: 10,
      Values: map[string]interface{} {"foodId": "10001"."foodName": "Mapo tofu"},
  }
  strCmd := rdb.XAdd(ctx, xAddArgs)
  fmt.Println(strCmd.Result())
  // 1609083771429-0 <nil>

  // Insert second message
  strCmd = rdb.XAdd(ctx, &redis.XAddArgs{Stream: "example:stream", Values: []string{"foodId"."10002"."foodName"."Fragrant Chicken Wings."}})
  fmt.Println(strCmd.Result())
  // 1609083771430-0 <nil>

  // Limit the length of the stream to about 10
  intCmd := rdb.XTrimApprox(ctx, "example:stream".10)
  fmt.Println(intCmd.Result())
  // 0 <nil>

  // Get the length of the stream
  intCmd = rdb.XLen(ctx, "example:stream")
  fmt.Println(intCmd.Result())
  // 2 <nil>

  // Get the list of messages
  xMessageSliceCmd := rdb.XRange(ctx, "example:stream"."-"."+")
  fmt.Println(xMessageSliceCmd.Result())
  // [{1609083771429-0 map[foodId:10001 foodName: mapo tofu]} {1609083771430-0 map[foodId:10002 foodName: mapo tofu]}] 
      

  // Get the list of messages in reverse, starting with the latest message
  xMessageSliceCmd = rdb.XRevRange(ctx, "example:stream"."+"."-")
  fmt.Println(xMessageSliceCmd.Result())
  // [{1609083771430-0 map[foodId:10002 foodName: mapo tofu]} {1609083771429-0 map[foodId:10001 foodName: mapo tofu]}] 
      

  // Reads the next message with the given ID
  xReadArgs := &redis.XReadArgs{
      Streams: []string{"example:stream"."1609083771429-0"},
      Count:   1,
      Block:   5 * time.Second,
  }
  xStreamSliceCmd := rdb.XRead(ctx, xReadArgs)
  fmt.Println(xStreamSliceCmd.Result())
  // [{example:stream [{1609083771430-0 map[foodId:10002 foodName: chicken wing]}]}] 
      
        [{example:stream [{1609083771430-0 map[foodId:10002 foodName: chicken wing]}]
      

  // Delete the message
  intCmd = rdb.XDel(ctx, "example:stream"."1609083771430-0"."1609083771429-0")
  fmt.Println(intCmd.Result())
  // 2 <nil>

// Create a consumer group, Eater, on stream to start consuming from the latest message ($representation)
  statusCmd := rdb.XGroupCreate(ctx, "example:stream"."eater"."$")
  fmt.Println(statusCmd.Result())
  // OK <nil>

  // Read messages not read by other consumers in the consumer group Eater >
  XADD "example:stream" * foodId 1003 foodName will get the result
  xReadGroupArgs := &redis.XReadGroupArgs{
      Group:    "eater".// Consumer group
      Consumer: "eater01".// the consumer is created when used
      Streams:  []string{"example:stream".">"}, / / the stream flow
      Block:    0.// Wait indefinitely
      NoAck:    false.// Confirmation is required
  }
  xStreamSliceCmd = rdb.XReadGroup(ctx, xReadGroupArgs)
  xStreamSlice, err := xStreamSliceCmd.Result()
  iferr ! =nil {
      log.Fatal(err)
  }
  fmt.Println(xStreamSlice)
  // [{example:stream [{1609086089189-0 map[foodId:1003 foodName: 1003]}]}] // [{example:stream [{1609086089189-0 map[foodId:1003 foodName: 1003]}]

  // Confirm that the message is processed
  intCmd = rdb.XAck(ctx, "example:stream"."eater"."1609086089189-0")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Set the last delivery ID to 1609086089189-0
  statusCmd = rdb.XGroupSetID(ctx, "example:stream"."eater"."1609086089189-0")
  fmt.Println(statusCmd.Result())
  // OK <nil>

  // View processing messages
  // type XPending struct {
  // Count int64
  // Lower string
  // Higher string
  // Consumers map[string]int64
  // }
  xPendingCmd := rdb.XPending(ctx, "example:stream"."eater")
  fmt.Println(xPendingCmd.Result())
  // &{1 1609086342551-0 1609086342551-0 map[eater01:1]} <nil>

  // Transfer attribution of messages to transfer messages that have not been processed for more than two minutes to consumer eater02
  xClaimArgs := &redis.XClaimArgs{
      Stream:   "example:stream",
      Group:    "eater",
      Consumer: "eater02",
      MinIdle:  2 * time.Minute,
      Messages: []string{"1609086342551-0"},
  }
  xMessageSliceCmd = rdb.XClaim(ctx, xClaimArgs)
  fmt.Println(xMessageSliceCmd.Result())
  // [] 
      
        // There is no message that meets the requirement
      

  // Check the flow information
  // type XInfoStream struct {
  // Length int64
  // RadixTreeKeys int64
  // RadixTreeNodes int64
  // Groups int64
  // LastGeneratedID string
  // FirstEntry XMessage
  // LastEntry XMessage
  // }
  xInfoStreamCmd := rdb.XInfoStream(ctx, "example:stream")
  fmt.Println(xInfoStreamCmd.Result())
  // &{3 1 2 1 1609086342551-0 {1609082364313-0 map[foodId:10001 foodName: mapo tofu]} {1609086342551-0 map[foodId:1003 FoodName: fat house happy water]}} 
      

  // View consumer group messages
  // type XInfoGroup struct {
  // Name string
  // Consumers int64
  // Pending int64
  // LastDeliveredID string
  // }
  xInfoGroupCmd := rdb.XInfoGroups(ctx, "example:stream")
  fmt.Println(xInfoGroupCmd.Result())
  // [{eater 0 0 1609086089189-0}] <nil>

  // Delete consumer
  intCmd = rdb.XGroupDelConsumer(ctx, "example:stream"."eater"."eater01")
  fmt.Println(intCmd.Result())
  // 1 <nil>

  // Delete the consumer group
  intCmd = rdb.XGroupDestroy(ctx, "example:stream"."eater")
  fmt.Println(intCmd.Result())
  // 1 <nil>
}
Copy the code

Release subscription

// pub.go
package main

import (
	"context"
	"fmt"

	"github.com/go-redis/redis/v8"
)

func pub(a) {
  ctx := context.Background()
  rdbPub := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",
  })
  intCmd := rdbPub.Publish(ctx, "example:channel"."Tomorrow is a holiday!!")
  fmt.Println(intCmd.Result())
}

func main(a) {
  pub()
}
Copy the code
// sub.go
package main

import (
	"context"
	"fmt"

	"github.com/go-redis/redis/v8"
)

func sub(a) {
  ctx := context.Background()

  rdbSub := redis.NewClient(&redis.Options{
      Addr: "localhost:6381",
  })

  sub := rdbSub.Subscribe(ctx, "example:channel")
  // Match pattern to subscribe
  // sub:= rdbSub.PSubscribe(ctx, "*:channel")

  select {
  case message := <-sub.Channel():
      fmt.Println(message.Channel)
      // example:channel
      fmt.Println(message.Pattern)
      //
      fmt.Println(message.Payload)
      Tomorrow is a holiday!!
      fmt.Println(message.PayloadSlice)
      / / []
      fmt.Println(message.String())
      // Message
      
  }
  defer sub.Close()

  // Get all channels that match pattern
  strSliceCmd := rdbSub.PubSubChannels(ctx, "*:channel")
  fmt.Println(strSliceCmd.Result())
  // [example:channel] <nil>

  // Get the number of patterns
  intCmd := rdbSub.PubSubNumPat(ctx)
  fmt.Println(intCmd.Result())
  // 0 <nil>

  // Get the number of subscribers on the specified channel < code subscription 1 + terminal command line subscription 1 >
  strIntMapCmd := rdbSub.PubSubNumSub(ctx, "example:channel")
  fmt.Println(strIntMapCmd.Result())
  // map[example:channel:2] <nil>
}

func main(a) {
  sub()
}
Copy the code

The resources

Pkg.go.dev/github.com/…

www.redis.cn/topics/stre…