Elasticsearch is a distributed RESTful search and analysis engine capable of addressing a growing number of use cases. At the heart of the Elastic Stack, it centrally stores your data for lightning-fast searches, fine-tuned relevance, and powerful analytics that are easy to scale. Elasticsearch provides real-time business data analysis and statistics in many cases. Websocket is often used in real-time event processing. WebSocket makes it easier to exchange data between the client and the server, allowing the server to actively push data to the client. In the WebSocket API, the browser and server only need to complete a handshake to create a persistent connection and two-way data transfer. How do we import Websocket data into Elasticsearch?

In today’s presentation, we will use a Python application as the Router for the Websocket. Our structure is as follows:

Above, we use Websocket to collect data and convert it into data results that can be imported. This real-time business data can be information such as stocks. The principle is the same as what we used to do with MQTT.

 

The preparatory work

If you don’t already have Elasticsearch and Kibana installed, please refer to my previous post “Elastic: A Beginner’s Guide” to install Elasticsearch and Kibana.

In today’s exercise, I’ll use the REST API provided by finnhub.io/ to illustrate. We must request an API key to get the data:

When you sign your name, you can see the API key as follows:

Let’s click on API Documentation above:

On the left we click Trades, then copy our own code and save it to the local file finnhub-websockets. Py:

finnhub-websockets.py

#https://pypi.org/project/websocket_client/
import websocket

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    ws.send('{"type":"subscribe","symbol":"AAPL"}')
    ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
    ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=c091aan48v6tm13rku80",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()
Copy the code

Let’s install websocket-client:

pip3 install websocket-client
Copy the code

Then, we run the Python application directly above:

python3 finnhub-websockets.py
Copy the code

We should see output similar to the following:

Ok, so it looks like our API is working.

 

Import data to Elasticsearch

Let’s import our data into Elasticsearch. We visit the website elasticsearch – py. Readthedocs. IO/en/v7.10.1 /. First, we must install the ElasticSearch package:

pip3 install elasticsearch
Copy the code

We can refer to my previous article “Getting Started with Elasticsearch development – Python”. We can install ES connections as described in that article and further modify our finnhub-websocket.py file:

finnhub-websocket.py


#https://pypi.org/project/websocket_client/
import json
import datetime
import websocket
from elasticsearch import Elasticsearch

es = Elasticsearch([{'host':'localhost','port':9200}])

def on_message(ws, message):
    message_json = json.loads(message)
    message_json["@timestamp"] = datetime.datetime.utcnow().isoformat()
    res = es.index(index="websockets-data", body=message_json)
    print(message_json)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    ws.send('{"type":"subscribe","symbol":"AAPL"}')
    ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"TSLA"}')
    ws.send('{"type":"subscribe","symbol":"ESTC"}')

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=<my-finnhub-token>",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()
Copy the code

Above you need to enter your own token into the code above. Let me make it clear.

  1. The es variable establishes a connection to Elasticsearch. You will need to modify the above address and port for your deployment. If necessary, you will also need to provide the appropriate account information to connect
  2. In on_message, we added the current timestamp, which makes our data time-sensitive and allows us to analyze the data more accurately
  3. Our data is stored in the Websockets -data index in Elasticsearch.

Let’s rerun our application:

python3 finnhub-websockets.py 
Copy the code

Likewise, we can see a steady stream of data being imported into Elasticsearch.

We can view the newly produced Websockets -data index in Kibana using the following command:

GET  _cat/indices
Copy the code
yellow open websockets-data n3RU2Ze8Rj-hVi3a8H3-zw 1 1 1 0 4kb 4kb green open .apm-custom-link Fqq-lxCiQHKib8kxdO0uoQ 1 0 0 0 208B 208B green open. kibana_task_Manager_1 29ilRYTkSOSx1aFtR0DUWQ 1 0 5 213 89.3 KB 89.3 KB Green open Apm-agent-configuration YY8-sBN8TBWAC4R_L1-8QQ 10 0 208b 208b Green open.kibana-event-log-7.10.0-000001 G7vkPKUHQiqxfpJDGnvKmw 10 10 5.6 KB 5.6 KB Green open. kibanA_1 oF471rX0R8Cu4H1tvE813Q 10 18 2 10.4 MB 10.4 MBCopy the code

We can create an index schema for this index:

At the time of writing this article, it is not us trading time, so there is no data in websocket for the time being. At transaction time, Websocket pushes data to Elasticsearch itself. We’ll find the following fields:

The above field definition can be found at the address:

 

We can use Lens in Kibana for real-time data analysis of our Stock data:

 

conclusion

In this article, you will see how to use Python as a router to import real-time data generated by WebSocket into Elasticsearch and analyze it in Elasticsearch.