Trường hợp của chúng ta mô phỏng sử dụng mẫu dữ liệu bán hàng thực tế và được xử lý qua Kinesis stream và được ghi vào ElasticSearch nơi chúng ta có thể phân tích dữ liệu bán hàng bằng đồ thị với Kibana. Dữ liệu bán hàng sẽ được đọc từ tệp CSV và có chi phí, số lượng và ngày giao dịch được áp dụng ngẫu nhiên. Kiến trúc trông như thế này:
KinesisConsumer
Sample application that consumes messages from a Kinesis stream and writes them to an ElasticSearch domain index
DevAx-05
vào ô tìm kiếm, nhấn EnterDevAx-05
vào ô tìm kiếm, nhấn Entergit checkout -b master
21. Chạy lệnh dưới đây để thêm các tập tin mới vào kho lưu trữ
git add .
22. Chạy lệnh dưới đây để commit tập tin mới
git commit -m "Initial commit"
23. Trong Eclipse IDE, chuột phải vào kho dữ liệu KinesisConsumer
KinesisConsumer
vào ô tìm kiếmTrong thiết lập bài thực hành đã cung cấp một EC2 instance để bạn chạy với tư cách Consumer trong tình huống này. Bây giờ bạn sẽ kết nối với EC2 instance, sao chép kho lưu trữ về, build và chạy chương trình Consumer.
Để kết nối an toàn với cơ sở hạ tầng đám mây của bạn, bạn sẽ sử dụng SSH. SSH yêu cầu một cặp public/private key-pair được thiết lập và cài đặt trên máy client và server sẽ giao tiếp thông qua giao thức SSH. Các EC2 instance đã được khởi chạy với một public key. Bạn cần truy xuất private key để có thể sử dụng khóa đó để thiết lập kết nối an toàn.
Hãy đảm bảo bạn đã có key-pair được thiết lập khi triển khai CloudFormation. Lưu tập tin khóa này trên máy của bạn.
chmod 400 <path to your key-pair file>
30. Chạy lệnh dưới đây
ssh -i <path to your key-pair file> ec2-user@<EC2InstanceKinesisConsumer>
Thay <EC2InstanceKinesisConsumer> bằng giá trị EC2InstanceKinesisConsumer trong tab Output của stack DevAx-05
git clone https://git-codecommit.<your_region>.amazonaws.com/v1/repos/KinesisConsumer
Thay <your_region> bằng Region của bạn
cd KinesisConsumer
/opt/apache-maven-3.3.9/bin/mvn package shade:shade
33. Sẽ mất một chút thời gian để build mã nguồn ứng dụng. 34. Khi quá trình build hoàn tất, chạy ứng dụng Kinesis Consumer bằng lệnh:
java -jar target/KinesisConsumerApp-1.0.0.jar
Ứng dụng sẽ khởi động và chờ các sự kiện được gửi vào Kinesis stream. Bạn sẽ không thấy lỗi nào và sau một lúc, đầu ra của bảng điều khiển sẽ hiển thị Sleeping… vì không có bản ghi nào trong luồng để xử lý:
Bây giờ bạn sẽ lặp lại quá trình trên, lần này, tạo một kho lưu trữ cho ứng dụng Kinesis Producer và sao chép chúng vào Kinesis Producer EC2 instance. Sau khi hoàn tất, bạn có thể bắt đầu thử nghiệm truyền dữ liệu giữa hai máy thông qua Amazon Kinesis.
Quá trình tạo kho lưu trữ giống như các bước trước, nhưng chúng ta sẽ lặp lại nó ở đây để tiện theo dõi.
KinesisProducer
Sample application that publishes sales events into a Kinesis stream
DevAx-05
vào ô tìm kiếm, nhấn Entergit checkout -b master
git add .
git commit -m "Initial commit"
KinesisProducer
vào ô tìm kiếmchmod 400 <path to your key-pair file>
ssh -i <path to your key-pair file> ec2-user@<EC2InstanceKinesisProducer>
Thay <EC2InstanceKinesisProducer> bằng giá trị EC2InstanceKinesisProducer trong tab Output của stack DevAx-05
git config --global credential.helper '!aws codecommit credential-helper $@'
git config --global credential.UseHttpPath true
git clone https://git-codecommit.<your_region>.amazonaws.com/v1/repos/KinesisProducer
Thay <your_region> bằng Region của bạn
cd KinesisProducer
/opt/apache-maven-3.3.9/bin/mvn package shade:shade
wget https://workshops.devax.academy/monoliths-to-microservices/module5/files/SalesData.csv
Khi dữ liệu được gửi đến ElasticSearch, nó sẽ cố gắng suy ra các kiểu dữ liệu trong nội dung và thường thực hiện tốt việc xác định đúng các kiểu dữ liệu. Trong trường hợp của chúng ta, chúng ta đang gửi ngày/giờ của mỗi bản ghi bán hàng dưới kiểu long và vì vậy ElasticSearch sẽ diễn giải trường này chỉ là một số (kiểu long). Thay vào đó, chúng ta cần cảnh báo ElasticSearch rằng nó sẽ diễn giải trường dấu thời gian này là Date. Chúng ta thực hiện việc này bằng cách tạo các Mapping trong chỉ mục trước khi chúng ta gửi bất kỳ bản ghi nào. 65. Trên cửa sổ dòng lệnh, bạn có kết nối SSH tới Kinesis Consumer EC2 instance, Chạy lệnh dưới đây
curl -X PUT "https://<ElasticsearchEndpoint>/transactions/" -k -d '{
"mappings": {
"transaction": {
"properties": {
"timestamp": {
"type": "date"
},
"productLine": {
"type": "string"
},
"retailerType": {
"type": "string"
},
"productType": {
"type": "string"
},
"product": {
"type": "string"
},
"quantity": {
"type": "long"
},
"unitCost": {
"type": "double"
},
"totalCost": {
"type": "double"
}
}
}
}
}';
{“acknowledged”:true}