Skip to content

Python library for consuming Kinesis Data Stream.

License

Notifications You must be signed in to change notification settings

hengfengli/kcpy

Folders and files

NameName
Last commit message
Last commit date
Dec 6, 2018
Dec 6, 2018
Dec 6, 2018
Dec 6, 2018
Nov 13, 2018
Oct 19, 2019
Sep 20, 2018
Dec 1, 2018
Dec 5, 2018
Oct 27, 2020
Feb 21, 2019
Dec 6, 2018
Nov 13, 2018
Sep 20, 2018
Dec 6, 2018
Dec 6, 2018

Repository files navigation

Kinesis Consumer in Python

alt text alt text alt text alt text alt text

A kinesis consumer is purely written in python. This is a lightweight wrapper on top of AWS python library boto3. You also can consume records from Kinesis Data Stream (KDS) via:

  • Lambda function: I have a demo kinesis-lambda-sqs-demo showing how to consume records in a serverless and real-time way.
  • Kinesis Firehose: This is a AWS managed service and easily save records into different sinks, like S3, ElasticSearch, Redshift.

Installation

Install the package via pip:

pip install kcpy

Getting started

from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name')
for record in consumer:
    print(record)

The output would look like:

{
    'ApproximateArrivalTimestamp': datetime.datetime(2018, 11, 13, 11, 57, 55, 117807), 
    'Data': b'Jessica Walter', 
    'PartitionKey': 'Jessica Walter', 
    'SequenceNumber': '1'
}

Or, you can consume stream data with checkpointing:

from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name', consumer_name='my_consumer', checkpoint=True)
for record in consumer:
    print(record)

Checkpointing

Below shows the schema of checkpointing:

                                                                   producer
[stream_1]                                                            |
+---------------+---+---+---+---+---+---+---+---+                     |
| shard_1       | 1 | 2 | 3 | 4 | 5 | 6 | 7 |...| <-------------------+
+---------------+---+---+---+---+---+---+---+---+                     |
| shard_2       | 1 | 2 | 3 | 4 | 5 |...| <---------------------------+
+---------------+---+---+---+---+---+---+---+---+---+                 |
| shard_3       | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |...| <---------------+
+---------------+---+---+---+---+---+---+---+---+---+
                          ^                   ^
                          |                   |
                      consumer_1          consumer_2
                          |                   |
                          |                   +---------+
                          |                             |
                          +------------------+          |
                                             |          |
                                             v          |
+---------------+-------------+----------+--------+     |
| consumer_name | stream_name | shard_id | seq_no |     |
+---------------+-------------+----------+--------+     |
| consumer_1    | stream_1    | shard_1  |   5    |     |
| consumer_1    | stream_1    | shard_2  |   15   |     |
| consumer_1    | stream_1    | ...      |   15   |     |
| consumer_1    | stream_1    | shard_N  |   XX   |     |
| consumer_2    | stream_1    | shard_1  |   6    | <---+
+---------------+-------------+----------+--------+

Features

  • Read records from a stream with multiple shards
  • Save checkpoint for each shard consumer for a stream

Todo

  • Add type checking with mypy
  • Add tox for automating multiple testing environments
  • Add the config for travis CI
  • Support other storage solutions (mysql, dynamodb, redis, etc.) for checkpointing
  • Rebalance when the number of shards changes
  • Allow kcpy to run on multiple machines

Changelog

0.1.7

  • Add travis CI config and remove python3.5.

0.1.6

  • Fix some issues in setup.py.

0.1.5

  • Add consumer checkpointing with a simple sqlite storage solution.

0.1.4

  • Pass aws configurations into boto3 client directly.

0.1.3

  • Update the README.

0.1.2

  • Add markdown support for long description.

0.1.1

  • Add a long description.

0.1.0

  • First version of kcpy.

License

Copyright (c) 2018 Hengfeng Li. It is free software, and may be redistributed under the terms specified in the LICENSE file.