-
Notifications
You must be signed in to change notification settings - Fork 4
VI. Emitting Data
The final step to Kraken is emitting data to Kafka. Kafka will then send the data to any of the sources we specified in our connectors.
In a production environment, the scheduler will create a set of jobs based on what's supplied in the produce
block. For a our TutorialWorker, we will emit our name data. Please see Task Basics if you need a refresher on how produce
, perform_async
, and perform
work.
Our payload with be a hash (dictionary) containing our first and last name information as specified by our schema. If we were to run this in production, the same data would be entered in S3 every minute, the produce
block is intended to create jobs based off of configurations we specify in our config.yml
or what we specify in our perform
method. For test purposes, this is sufficient.
module Tutorial
class TutorialWorker < ::BaseWorker
register_connector :s3_json, "tutorial.test"
schema "com.mashable.kraken.tutorial.test" do
required :first_name, :string
optional :middle_name, :string
required :last_name, :string
end
produce({every: 1.minute}) do
data = { first_name: "Benjamin", last_name: "Anderson" }
# TutorialWorker jobs will take our data, and pass it to our perform method
perform_async(data)
end
def perform(data)
# todo
end
end
end
The last step of Kraken is to actually emit our data to Kafka and Redis. Redis is utilized to catch and prevent the re-emitting of records. To do this, we utilize a method called add_member
. add_member
takes in a key and a member identifier. It's method signature is defined as add_member(string_key, string_member)
.
We can add our data / member into redis like so
def perform(data)
# emit a unique key and all of our data to redis (your member should usually be a unique identifier)
add_member("#{data[:first_name]}-tutorial-test", data.to_s)
end
To emit our data to Kafka in order to exported to S3, we utilize a method from BaseWorker
called emit
. emit
takes the following parameters: emit(String topic_name, Hash payload, { schema: String schema_name })
.
-
topic_name
is the name of the channel we want to emit to (it must match the name we specified in our connector) -
payload
is the data we want to send to Kafka -
schema_name
is the name of the schema we want to use to encode the payload. It should match the name we specified in ourschema(name) do
definition.
Our final importer would be implemented as follows:
module Tutorial
class TutorialWorker < ::BaseWorker
register_connector :s3_json, "tutorial.test"
schema "com.mashable.kraken.tutorial.test" do
required :first_name, :string
optional :middle_name, :string
required :last_name, :string
end
produce({every: 1.minute}) do
data = { first_name: "Benjamin", last_name: "Anderson" }
perform_async(data)
end
def perform(data)
emit 'tutorial.test', data, schema: 'com.mashable.kraken.tutorial.test'
end
end
end
Kraken normally expects a Kafka cluster to use to coordinate workers and to collect data. However, this can be a large task to set up, and isn't necessary for developing and testing workers. You can use standalone mode, which instructs Kraken to operate in a reduced-functionality mode using a local redis instance, instead.
Before we can run our worker in the Kraken console, we need to start redis.
If you have redis up and running already, you can skip this section.
It is recommended to install Redis via homebrew:
brew install redis
You should also install lunchy
for easier service management:
brew install lunchy
Finally, you can start the Redis service:
lunchy start redis
Next we will want to boot the redis command-line interface to monitor the information going into redis. Run the following in a new terminal tab:
redis-cli MONITOR
You should get a status code of "OK".
We might want to manually test the worker to make sure it's doing what we want. We can do this by starting Kraken in STANDALONE
mode, then instantiating and invoking the worker directly.
STANDALONE=1 ./bin/kraken console
This gives us an interactive Ruby console with all the Kraken resources loaded. Our perform
method expects a payload, so we'll define one:
2.4.1 :001 > data = { first_name: "Ben", last_name: "Anderson" }
=> {:first_name=>"Ben", :last_name=>"Anderson"}
Next, lets run our worker.
2.4.1 :002 > Tutorial::TutorialWorker.new.perform(data)
TODO:
emit
in STANDALONE is currently a no-op. We will make it write out to Redis for observation/testing. Currently under development.
Press enter and go back to your terminal where you ran the redis-cli MONITOR
command. You should see something similar to below.
1509066081.401145 [1 127.0.0.1:61673] "lpush" "com.mashable.kraken.tutorial.test" "{\"first_name\"=>\"Ben\", \"last_name\"=>\"Anderson\", \"_timestamp\"=>\"2017-10-26T20:01:21.395-05:00\"}"
You can boot the Kraken daemon with:
STANDALONE=true bin/kraken server
This will start all the parts: the scheduler, the worker pools, etc. Every minute, Kraken will produce a job, and a worker will consume it. You will see log messages indicating as such. Ctrl-C
will shut down the server.
(TODO)