Our example use case simulates sales data coming from the field, and being processed via a Kinesis stream and written out into ElasticSearch where we can analyse the sales data graphically with Kibana. The sales data will be read from a CSV file, and have costs, quantities and transaction dates randomly applied. The architecture looks like this:
KinesisConsumer
Sample application that consumes messages from a Kinesis stream and writes them to an ElasticSearch domain index
DevAx-05
to the search bar, press EnterDevAx-05
to the search bar, press Entergit checkout -b master
21. Execute the below command to add the new file
git add .
22. Execute the below command to commit the new file
git commit -m "Initial commit"
23. In the Eclipse IDE, right-click on KinesisConsumer repository
KinesisConsumer
to the search barThe labs setup has provisioned an EC2 instance for you to run as the Consumer in this scenario. You will now connect to the EC2 instance, clone the repository, build and run the Consumer code.
In order to securely connect into your cloud infrastructure, you will use SSH, the secure shell. SSH requires a public/private key-pair to be set up and installed on the client and server that will communicate via the SSH protocol. The EC2 instances have been launched with a public key. You need to retrieve the private key so you can use that to establish a secure connection. Click Download PEM. Save this private key file on your local machine.
chmod 400 <path to your key-pair file>
30. Execute the below command
ssh -i <path to your key-pair file> ec2-user@<EC2InstanceKinesisConsumer>
Replace <EC2InstanceKinesisConsumer> by the value of EC2InstanceKinesisConsumer in the Output tab of DevAx-05 stack
git clone https://git-codecommit.<your_region>.amazonaws.com/v1/repos/KinesisConsumer
Replace <your_region> by your Region
cd KinesisConsumer
/opt/apache-maven-3.3.9/bin/mvn package shade:shade
33. It will take a few moments to complete the build. 34. When the build is complete, start the Kinesis Consumer application by executing the below command
java -jar target/KinesisConsumerApp-1.0.0.jar
The application will start and wait for events to be published into the Kinesis stream. You should see no errors, and after a few moments, the console output should show Sleeping… since there are no records in the stream to process:
You will now repeat the process above, this time, creating a repository for the Kinesis Producer application, and cloning it on to the Kinesis Producer EC2 instance. Once this is complete, you can begin testing data transfer between the two machines, via Amazon Kinesis.
The process of creating the repository is the same as the previous steps, but we repeat it here for convenience.
KinesisProducer
Sample application that publishes sales events into a Kinesis stream
DevAx-05
to the search bar, press Entergit checkout -b master
git add .
git commit -m "Initial commit"
KinesisProducer
to the search barchmod 400 <path to your key-pair file>
ssh -i <path to your key-pair file> ec2-user@<EC2InstanceKinesisProducer>
Replace <EC2InstanceKinesisProducer> by the value of EC2InstanceKinesisProducer in the Output tab of the DevAx-05 stack
git config --global credential.helper '!aws codecommit credential-helper $@'
git config --global credential.UseHttpPath true
git clone https://git-codecommit.<your_region>.amazonaws.com/v1/repos/KinesisProducer
Replace <your_region> by your Region
cd KinesisProducer
/opt/apache-maven-3.3.9/bin/mvn package shade:shade
wget https://workshops.devax.academy/monoliths-to-microservices/module5/files/SalesData.csv
When data is sent to ElasticSearch, it tries to infer the data types in the content, and usually does a good job of getting the data types right. In our case, we are sending the date/time of each of the sales record as long representation (ticks since Epoch) and so ElasticSearch would interpret this field as just a number (of type long). Instead, we need to warn ElasticSearch that it should interpret this timestamp field as a Date. We do this by creating Mappings in the index before we send any records. 65. On the terminal window you have the SSH connection to the Kinesis Consumer EC2 instance, Execute the below command
curl -X PUT "https://<ElasticsearchEndpoint>/transactions/" -k -d '{
"mappings": {
"transaction": {
"properties": {
"timestamp": {
"type": "date"
},
"productLine": {
"type": "string"
},
"retailerType": {
"type": "string"
},
"productType": {
"type": "string"
},
"product": {
"type": "string"
},
"quantity": {
"type": "long"
},
"unitCost": {
"type": "double"
},
"totalCost": {
"type": "double"
}
}
}
}
}';
{“acknowledged”:true}