Redis Streams in Action — Part 3 (Tweet Processor)

Welcome to this collection of weblog posts that covers Redis Streams with the assistance of a sensible instance. We’ll use a pattern software to make Twitter knowledge out there for search and question in real-time. RediSearch and Redis Streams function the spine of this answer that consists of a number of co-operating elements, every of which might be coated in a devoted weblog put up.

The code is on the market on this GitHub repo – https://github.com/abhirockzz/redis-streams-in-action

This weblog put up will cowl a Java-based tweets processor software whose function is to choose up tweets from Redis Streams and retailer them (as a HASH) in order that they are often queried utilizing RediSearch (the correct time period for that is “indexing paperwork” in RediSearch). You’ll deploy the appliance to Azure, validate it, run a couple of RediSearch queries to look tweets. Lastly, there’s a part the place we’ll stroll by way of the code to grasp “how issues work”.

Pre-requisites

Please just be sure you learn half 2 of this collection and have the Tweets client software up and operating. This software will learn tweets from the Twitter Streaming API and push them to Redis Streams. Our tweets processor app (the one described on this weblog) will then take over.

You have to an Azure account which you may get totally free and the Azure CLI. Just like the earlier software, this one may also be deployed to Azure Container Situations utilizing common Docker CLI instructions. This functionality is enabled by integration between Docker and Azure. Simply guarantee that you’ve Docker Desktop model 2.3.0.5 or later, for Home windows, macOS, or set up the Docker ACI Integration CLI for Linux.

Deploy the App to Azure Container Situations

When you’ve been following alongside from the earlier weblog put up, you must have arrange the Enterprise tier of Azure Cache for Redis, utilizing this quickstart. When you end this step, be sure that you save the next data: the Redis host title and Entry key

The applying is on the market as a Docker container – the simplest means is to easily re-use it. When you want to construct your individual picture, please use the Dockerfile out there on the GitHub repo.

When you select to construct your individual picture, be certain that to construct the JAR file utilizing Maven (mvn clear set up) first

It is actually handy to deploy it to Azure Container Situations, which lets you run Docker containers on-demand in a managed, serverless Azure atmosphere.

Ensure you create an Azure context to affiliate Docker with an Azure subscription and useful resource group so you may create and handle container situations.

docker login azure
docker context create aci aci-context
docker context use aci-context

Set the atmosphere variables – be certain that to replace Redis host and credentials as per your account:

export STREAM_NAME=tweets_stream # do not change
export STREAM_CONSUMER_GROUP_NAME=redisearch_app_group # do not change

export REDIS_HOST=<redis host port e.g. my-redis-host>
export REDIS_PORT=<redis port>
export REDIS_PASSWORD=<redis entry key (password)>
export SSL=true

.. after which use docker run to deploy the container to Azure:

docker run -d --name redis-streams-consumer 
-e STREAM_NAME=$STREAM_NAME 
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME 
-e REDIS_HOST=$REDIS_HOST 
-e REDIS_PORT=$REDIS_PORT 
-e REDIS_PASSWORD=$REDIS_PASSWORD 
-e SSL=$SSL 
abhirockzz/tweets-redis-streams-consumer-java

Because the container is being created, you must see an output just like this:

[+] Working 2/2
 ⠿ Group redis-streams-consumer  Created                                                                             5.2s
 ⠿ redis-streams-consumer        Created                                                                            10.5s

Validate this utilizing the Azure portal:

To verify the container logs, you should use the standard docker logs command:

docker logs redis-streams-consumer

It is best to see an output just like this:

Studying from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089239324282880
Studying from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089243539517441
Studying from stream tweets_stream with XREADGROUP
not processed - tweet:1393089247721132033
Studying from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089256105693184
Studying from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089260304179200
....

Discover the not processed logs? We’ll focus on them within the subsequent part

As soon as the app is up and operating, it’s going to begin consuming from tweets_stream Redis Stream and retailer every tweet information in a separate HASH, which in flip might be listed by RediSearch. Earlier than transferring on, login to the Redis occasion utilizing redis-cli:

redis-cli -h <hostname> -p 10000 -a <password> --tls

How Are Issues Wanting?

When you see the logs fastidiously, you must have the ability to discover the title of the HASH (which relies on the tweet ID) e.g. tweet:<tweet id>. Simply examine its contents with HGETALL:

redis-cli> TYPE tweet:1393089163856056320
redis-cli> hash
redis-cli> HGETALL tweet:1393089163856056320

The end result will appear like some other HASH. For e.g.

 1) "location"
 2) "Nairobi, Kenya"
 3) "textual content"
 4) "RT @WanjaNjubi: #EidMubarak xf0x9fx99x8fnMay peace be upon you now and at all times.n#EidUlFitr https://t.co/MlL0DbM2aS"
 5) "id"
 6) "1393089163856056320"
 7) "consumer"
 8) "Hot_96Kenya"
 9) "hashtags"
10) "EidMubarak,EidUlFitr"

Alright, its time to question tweets with RediSearch! Let’s use a couple of instructions to look the tweets-index index:

  • FT.SEARCH tweets-index hi there – will return tweets which
  • FT.SEARCH tweets-index hi there|world – its the identical as above, simply that it is relevant for “hi there” OR “world”
  • Use FT.SEARCH tweets-index "@location:India" for those who’re keen on tweets from a particular location
  • FT.SEARCH tweets-index "@consumer:jo* @location:India" – this combines location together with a standards that the username ought to begin with jo
  • FT.SEARCH tweets-index "@consumer:jo* | @location:India" – that is refined variant of the above. | signifies an OR standards
  • You possibly can search utilizing hash tags as nicely – FT.SEARCH tweets-index "@hashtags:cov*
  • Embrace a number of hashtags as such – FT.SEARCH tweets-index "@hashtags:Med*"

These are just some examples. I’d extremely advocate you to confer with the RediSearch documentation and check out different queries as nicely.

Let’s Scale Out

One of many key advantages of utilizing Redis Streams is to leverage its Shopper Teams characteristic. This implies you can merely add extra situations to the appliance (horizontal scale out) with a purpose to enhance the processing – the extra situations, the sooner the tweets will get processed. Every software will eat from a unique a part of the identical Redis Stream (tweets_stream), thus the workload is distributed (virtually) evenly amongst all of the situations – this provides you the flexibility to scale linearly.

Let’s do this out. To start out one other occasion, use docker run – be certain that to make use of a unique title:

docker run -d --name redis-streams-consumer_2 
-e STREAM_NAME=$STREAM_NAME 
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME 
-e REDIS_HOST=$REDIS_HOST 
-e REDIS_PORT=$REDIS_PORT 
-e REDIS_PASSWORD=$REDIS_PASSWORD 
-e SSL=$SSL 
abhirockzz/tweets-redis-streams-consumer-java

Discover that I used a unique title --name redis-streams-consumer_2

Issues will proceed like earlier than – just a bit sooner since we now have one other serving to hand. You possibly can verify the logs the brand new occasion as nicely – docker logs redis-streams-consumer_2.

You possibly can proceed to experiment additional and check out scaling out to extra situations.

Let’s Dig a Little Deeper

We will introspect Redis Streams utilizing the XPENDING command:

XPENDING tweets_stream redisearch_app_group

You will note an output just like this:

1) (integer) 25
2) "1618572598902-0"
3) "1618573768902-0"
4) 1) 1) "consumer-b6410cf9-8244-41ba-b0a5-d79b66d33d65"
      2) "20"
   2) 1) "consumer-e5a872d4-b488-416e-92ee-55d2902b338f"
      2) "5"

When you’re new to Redis Streams, this output may not make a whole lot of sense. The decision to XPENDING returns the no. of messages that had been acquired by our processing software, however have not been processed (and acknowledged) but. On this case, we now have two software situations (they randomly generate UUIDs) and have 20 and 5 unprocessed messages respectively (after all, the numbers will differ in your case).

In a manufacturing situation, software failures may occur as a consequence of a number of causes. Nevertheless, in our pattern app, the under code snippet was used to simulate this example – it randomly chooses (about 20% chance) to not course of the tweet acquired from Redis Streams:

if (!(random.nextInt(5) == 0)) 
    conn.hset(hashName, entry.getFields());
    conn.xack(streamName, consumerGroupName, entry.getID());

That is the explanation you will note XPENDING rely rising slowly however certainly. In manufacturing, if one (or extra) situations crash, the XPENDING rely for these occasion(s) will cease rising however stay fixed. It implies that, these messages are actually left unprocessed – on this particular instance, it signifies that the tweet data is not going to be out there in RediSearch so that you can question.

Redis Streams to the Rescue

Redis Streams gives dependable messaging. It shops the state for every client – that is precisely what you see with XPENDING! When you begin one other client occasion with the identical group and client title, it is possible for you to to replay the identical messages and re-process them to make sure that tweets are saved in Redis. This doesn’t contain doing something completely different/extra in your half.

Another choice is to have a devoted software that may periodically verify the patron group state (XPENDING), declare messages which have been left deserted, re-process and (most significantly) acknowledge (XACK) them. Within the subsequent (ultimate) a part of this collection, we’ll discover how one can construct an software to do precisely this!

So, How Does it All Work?

It is a good time to stroll by way of the code actual fast.

You possibly can confer with the code within the GitHub repo

The app makes use of JRediSearch which abstracts the API of the RediSearch module. The very first thing we do is set up a connection to Redis:

GenericObjectPoolConfig<Jedis> jedisPoolConfig = new GenericObjectPoolConfig<>();
JedisPool pool = new JedisPool(jedisPoolConfig, redisHost, Integer.valueOf(redisPort), timeout, redisPassword, isSSL);
Shopper redisearch = new Shopper(INDEX_NAME, pool);

Then we create a Schema and the Index definition.

        Schema sc = new Schema().addTextField(SCHEMA_FIELD_ID, 1.0).addTextField(SCHEMA_FIELD_USER, 1.0)
                .addTextField(SCHEMA_FIELD_TWEET, 1.0).addTextField(SCHEMA_FIELD_LOCATION, 1.0)
                .addTagField(SCHEMA_FIELD_HASHTAGS);

        IndexDefinition def = new IndexDefinition().setPrefixes(new String[]  INDEX_PREFIX );

        strive 
            boolean indexCreated = redisearch.createIndex(sc, Shopper.IndexOptions.defaultOptions().setDefinition(def));

            if (indexCreated) 
                System.out.println("Created RediSearch index ");
            
         catch (Exception e) 
            System.out.println("Didn't create RediSearch index - " + e.getMessage());
        

To discover the Redis Streams APIs (xgroupCreate, xreadGroup and so forth.) uncovered by the Jedis library, check out its javadocs

Earlier than transferring on, we create a Redis Streams Shopper group (utilizing xgroupCreate) – that is necessary. A client group represents a set of functions that work “collectively” and co-operate with one another to share the processing load:

strive 
    conn = pool.getResource();
    String res = conn.xgroupCreate(streamName, consumerGroupName, StreamEntryID.LAST_ENTRY, true);

Every app within the client group must be uniquely recognized. Whereas it’s attainable to assign a reputation manually, we generate a random client title.

String consumerName = "consumer-" + UUID.randomUUID().toString();

The principle a part of the patron app is loop that makes use of xreadGroup to learn from the Redis Stream. Discover the StreamEntryID.UNRECEIVED_ENTRY – which means we’ll are asking Redis to return stream entries which has not been acquired by any different client within the group. Additionally, our invocation blocks for 15 seconds and we decide to get a most of 50 messages per name to XREADGROUP (after all, you may change this as per necessities).

whereas (true) 

    Checklist<Entry<String, Checklist<StreamEntry>>> outcomes = conn.xreadGroup(consumerGroupName, consumerName, 50,
                        15000, false, Map.entry(streamName, StreamEntryID.UNRECEIVED_ENTRY));

    if (outcomes == null) 
        proceed;
    
    ....

Every stream entry must be saved to a Redis HASH (utilizing hset). The nice factor is that studying a stream entry returns a HashMap and that is precisely what HSET API expects as nicely. So we’re capable of re-use the HashMap!

That is not all although, discover the xack methodology – that is solution to name XACK and talk that we now have certainly processed the message efficiently:

                for (Entry<String, Checklist<StreamEntry>> end result : outcomes) 
                    Checklist<StreamEntry> entries = end result.getValue();
                    for (StreamEntry entry : entries) 
                        String tweetid = entry.getFields().get("id");
                        String hashName = INDEX_PREFIX + tweetid;

                        strive 
                            // simulate random failure/anomaly. ~ 20% will NOT be ACKed
                            if (!(random.nextInt(5) == 0)) 
                                conn.hset(hashName, entry.getFields());
                                conn.xack(streamName, consumerGroupName, entry.getID());
                            
                         catch (Exception e) 
                            proceed;
                        
                    
                

There’s a whole lot of scope for optimization right here. For e.g. you may make this course of multi-threaded by spawning a thread for every batch (say 50 messages)

That is all for this weblog!

within the Ultimate Half?

To date, we coated high-level overview partly 1, the tweets client Rust app partly 2, and a Java app to course of these tweets from Redis Streams. As promised, the ultimate a part of the collection will cowl an app to watch the method and re-process deserted messages with a purpose to maintain our general system sturdy – this might be a Serverless Go software deployed to Azure Capabilities. Keep tuned!


Supply hyperlink

About PARTH SHAH

Check Also

GVNDfDpbxFJkDRn5bdVyaU 1200 80

God of War Valkyries: locations and tips

The God of Battle Valkyries are the hardest bosses of the 9 realms. There are …

Leave a Reply

Your email address will not be published. Required fields are marked *

x