Eclipse Environment

The lab setup has provisioned an EC2 instance for you to run as the Producer in this scenario. You will now connect to the EC2 instance, clone the repository, build and run the Producer code. Note that this process is identical to the one you just completed on the Consumer EC2 instance, but we repeat the instructions here for convenience.

Contents

Setup Kinesis Producer

  1. Inside AWS Console, Cloudformation page, in the Outputs tab, locate the EC2InstanceKinesisProducer value and copy it to your clipboard.

ElasticSearch

  1. Using a new terminal window, issue the command below as a template to connect to the instance:
ssh -i <path to your downloaded PEM file> ec2-user@<EC2InstanceKinesisProducer>

Example:

ssh -i ~/Downloads/ee-default-keypair.pem ec2-user@34.212.21.7
  1. Once connected via SSH, issue this command to clone the repository:
git clone https://git-codecommit.ap-northeast-1.amazonaws.com/v1/repos/KinesisProducer

You will be asked for username and password. You can obtain these from Cloudformation Outputs tab - look for GitUsername and GitPassword.

  1. The source code you had checked in from your workstation will now be cloned onto the EC2 instance and will take a moment to complete.
  2. Build the source code using these commands:
cd KinesisProducer
/opt/apache-maven-3.3.9/bin/mvn package shade:shade

It will take a few moments to complete the build.

  1. The Producer needs source data, and is expecting a CSV file in the current working folder, called SalesData.csv. Copy the file onto the EC2 instance by using the wget command and specifying the file URL. You can get the file URL by right-clicking on the following file and copying its link address: SalesData.csv.

Configure ElasticSearch index mappings

When data is sent to ElasticSearch, it tries to infer the data types in the content, and usually does a good job of getting the data types right. In our case, we are sending the date/time of each of the sales record as long representation (ticks since Epoch) and so ElasticSearch would interpret this field as just a number (of type long). Instead, we need to warn ElasticSearch that it should interpret this timestamp field as a Date. We do this by creating Mappings in the index before we send any records.

  1. On the terminal window you have the SSH connection to the Kinesis Consumer EC2 instance, issue the following command - but replace with the ElasticsearchEndpoint value from Cloudformation Outputs tab:
curl -X PUT "https://<ElasticsearchEndpoint>/transactions/" -k -d '{
	"mappings": {
		"transaction": {
			"properties": {
				"timestamp": {
					"type": "date"
				},
				"productLine": {
					"type": "string"
				},
				"retailerType": {
					"type": "string"
				},
				"productType": {
					"type": "string"
				},
				"product": {
					"type": "string"
				},
				"quantity": {
					"type": "long"
				},
				"unitCost": {
					"type": "double"
				},
				"totalCost": {
					"type": "double"
				}
			}
		}
	}
}';

If successful, you will see a message.

{“acknowledged”:true}