Trường hợp của chúng ta mô phỏng sử dụng mẫu dữ liệu bán hàng thực tế và được xử lý qua Kinesis stream và được ghi vào ElasticSearch nơi chúng ta có thể phân tích dữ liệu bán hàng bằng đồ thị với Kibana. Dữ liệu bán hàng sẽ được đọc từ tệp CSV và có chi phí, số lượng và ngày giao dịch được áp dụng ngẫu nhiên. Kiến trúc trông như thế này:
Nội dung
KinesisConsumer
Sample application that consumes messages from a Kinesis stream and writes them to an ElasticSearch domain index
.Chọn Create repository. Kho lưu trữ mới của bạn sẽ được tạo, sẵn sàng cho bạn sử dụng. Chọn vào Skip để đóng màn hình tiếp theo. Chọn vào Close để đóng bảng điều khiển xuất hiện sau khi tạo kho lưu trữ.
Trong Eclipse IDE, mở Git Perspective. Nếu bạn chưa có khung thông tin này, hãy sử dụng lựa chọn menu Window | Perspective | Open Perspective | Git repositories.
Chọn biểu tượng Clone repository
Kho lưu trữ trống sẽ được sao chép vào đường dẫn local của bạn đã chỉ định.
public static final String ELASTICSEARCH_ENDPOINT
trung với thông tin từ tab Cloudformation Output, gọi đến ElasticsearchEndpoint. Mặc định, giá trị sẽ được thiết lập ở vị trí REPLACE_WITH_YOUR_ELASTIC_SEARCH_ENDPOINT
. Xóa và thay thế với giá trị ElasticsearchEndpoint của bạn. Lưu lại tập tin.cd
vào thư mục tải về từ kho dữ liệu của bạn.git checkout -b master
git add .
git commit -m "Initial commit"
Nếu tùy chọn này không xuất hiện hoặc không có branch Master, bạn có thể cần làm mới để cập nhật IDE.
Bạn sẽ thấy các tập tin mà bạn đã thêm vào kho lưu trữ được liệt kê trong giao diện trình duyệt.
Trong thiết lập bài thực hành đã cung cấp một EC2 instance để bạn chạy với tư cách Consumer trong tình huống này. Bây giờ bạn sẽ kết nối với EC2 instance, sao chép kho lưu trữ về, build và chạy chương trình Consumer.
Hãy đảm bảo bạn đã có key-pair được thiết lập khi triển khai CloudFormation. Lưu tập tin khóa này trên máy của bạn.
chmod 400 <path to your downloaded PEM file>
Ví dụ:
chmod 400 ~/Downloads/ee-default-keypair.pem
Để mở cửa sổ lệnh, bạn sẽ quay lại chúng một chút nữa.
ssh -i <path to your downloaded PEM file> ec2-user@<EC2InstanceKinesisConsumer>
Ví dụ:
ssh -i ~/Downloads/ee-default-keypair.pem ec2-user@34.212.21.7
Đừng thêm vào < và >.Nếu bạn có khó khăn ở bước này, hãy tìm hướng dẫn. Khi đã hoàn thành việc đăng nhập vào instance, tiếp tục các bước bên dưới.
git clone https://git-codecommit.ap-northeast-1.amazonaws.com/v1/repos/KinesisConsumer
Bạn sẽ bị hỏi username và password. Bạn có thể lấy thông tin này từ tab Cloudformation Outputs - tìm thông tin GitUsername và GitPassword.
Bạn sẽ nhận được phản hồi sau
Cloning into 'KinesisConsumer'...
remote: Counting objects: 18, done.
Unpacking objects: 100% (18/18), done.
cd
vào thư mục KinesisConsumer
và thực thi lệnh:cd KinesisConsumer
/opt/apache-maven-3.3.9/bin/mvn package shade:shade
Sẽ mất một chút thời gian để build mã nguồn ứng dụng.
java -jar target/KinesisConsumerApp-1.0.0.jar
Ứng dụng sẽ khởi động và chờ các sự kiện được gửi vào Kinesis stream. Bạn sẽ không thấy lỗi nào và sau một lúc, đầu ra của bảng điều khiển sẽ hiển thị Sleeping...
vì không có bản ghi nào trong luồng để xử lý:
INFO: Initializing record processor for shard: shardId-000000000000
Aug 21, 2017 11:30:19 AM idevelop.demo.RecordProcessor initialize
INFO: Initializing record processor for shard: shardId-000000000001
Aug 21, 2017 11:30:57 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000
Aug 21, 2017 11:30:57 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker info
INFO: Sleeping ...
Dừng việc thực thi trong cửa sổ lệnh bằng tổ hợp phím CTRL + C
Bây giờ bạn sẽ lặp lại quá trình trên, lần này, tạo một kho lưu trữ cho ứng dụng Kinesis Producer và sao chép chúng vào Kinesis Producer EC2 instance. Sau khi hoàn tất, bạn có thể bắt đầu thử nghiệm truyền dữ liệu giữa hai máy thông qua Amazon Kinesis.
Quá trình tạo kho lưu trữ giống như các bước trước, nhưng chúng ta sẽ lặp lại nó ở đây để tiện theo dõi.
KinesisProducer
Sample application that publishes sales events into a Kinesis stream
.Trong Eclipse IDE, mở Git Perspective. Nếu bạn chưa có khung thông tin này, hãy sử dụng lựa chọn menu Window | Perspective | Open Perspective | Git repositories.
Chọn biểu tượng Clone repository
Trong AWS CodeCommit console trong trình duyệt, nhấp vào nút Clone URL. Chọn HTTPS từ menu và sau đó sao chép URL vào khay nhớ tạm của bạn.
Quay lại Eclipse IDE, dán URL bản sao vào trường URI:
Trong mục Authentication, ở mục User nhập vào giá trị GitUsername từ thông tin ở tab Cloudformation Output. Ở mục Password, nhập giá trị GitPassword từ thông tin ở tab Cloudformation Output.
Chọn Next.
Trong Branch Selection, không có branch nào được tạo, vì vậy chỉ cần chọn vào Next.
Trong bảng Local destination, chọn một đường dẫn phù hợp trên filesystem của bạn và chọn vào Finish:. Kho lưu trữ trống sẽ được sao chép vào đường dẫn local của bạn đã chỉ định.
Tải xuống gói mã nguồn từ KinesisProducer.zip và giải nén tập tin zip vào hệ thống của bạn.
Sao chép nội dung của tập tin zip đã giải nén sang vị trí nơi bạn đã sao chép kho KinesisProducer về.
Mở cửa sổ lệnh và chạy lệnh cd
vào thư mục tải về từ kho dữ liệu của bạn.
Tạo một branch master mới bằng lệnh sau:
git checkout -b master
git add .
git commit -m "Initial commit"
Branch sẽ được đẩy lên kho dữ liệu CodeCommit. Xác nhận bằng việc làm mới lại trang CodeCommit console trên trình duyệt. Bạn sẽ thấy các tập tin mà bạn đã thêm vào kho lưu trữ được liệt kê trong giao diện trình duyệt.