Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First commit #1

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ on:

jobs:

ci:
name: Run CI
gobot:
name: Run gobot CI
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
Expand Down Expand Up @@ -85,3 +85,30 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
version: v1.56

rustbot:
name: Run rustbot CI
runs-on: ubuntu-latest
env:
CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }}

steps:
- name: check out code
uses: actions/checkout@v4

- name: cargo version and environment
id: vars
run: |
printf "Using cargo at: $(which cargo)\n"
printf "cargo version: $(cargo version)\n"

- name: setup rust toolchain
uses: dtolnay/rust-toolchain@stable

- name: run build
run: |
cargo check --manifest-path rustbot/Cargo.toml --verbose
cargo build --manifest-path rustbot/Cargo.toml --release --verbose

- name: run tests
run: cargo test --manifest-path rustbot/Cargo.toml --verbose
21 changes: 21 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,24 @@

# Dependency directories (remove the comment below to include it)
# vendor/

# Generated by Cargo
# will have compiled files and executables
**/debug/
**/target/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
**/Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
*.mp3


# Added by cargo

/target
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ linters:
- goimports
- prealloc

issues:
exclude-dirs:
- vendor
run:
deadline: 2m
skip-dirs:
Expand Down
25 changes: 21 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
# go-repo-template
# bot-banter

[![Build Status](https://github.com/milosgajdos/go-repo-template/workflows/CI/badge.svg)](https://github.com/milosgajdos/go-repo-template/actions?query=workflow%3ACI)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/milosgajdos/go-repo-template)
[![Build Status](https://github.com/milosgajdos/bot-banter/workflows/CI/badge.svg)](https://github.com/milosgajdos/bot-banter/actions?query=workflow%3ACI)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/milosgajdos/bot-banter)
[![License: Apache-2.0](https://img.shields.io/badge/License-Apache--2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

This is a `Go` project repo template following the [official guide](https://docs.github.com/en/free-pro-team@latest/github/creating-cloning-and-archiving-repositories/creating-a-template-repository).

# HOWTO

Run NATS
```
nix-shell -p nats-server
nats-server -js
```

Start a `gobot`:
```
go run ./gobot/...
```

Start a `rustbot`:
```
cargo run --manifest-path rustbot/Cargo.toml
```
21 changes: 19 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
module github.com/milosgajdos/go-repo-template
module nats-jet

go 1.20
go 1.22.2

require (
github.com/nats-io/nats.go v1.34.1
github.com/tmc/langchaingo v0.1.8
)

require (
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
30 changes: 30 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0=
github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA=
github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
164 changes: 164 additions & 0 deletions gobot/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package main

import (
"bufio"
"context"
"errors"
"log"
"os"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama"
)

func JetStreamRead(_ context.Context, cons jetstream.Consumer, errCh chan error, done chan struct{}) {
defer log.Println("done reading from JetStream")
iter, err := cons.Messages()
if err != nil {
errCh <- err
return
}
ReadStreamDone:
for {
select {
case <-done:
break ReadStreamDone
default:
msg, err := iter.Next()
if err != nil {
errCh <- err
break
}
log.Printf("Received a JetStream message: %s", string(msg.Data()))
if err := msg.Ack(); err != nil {
errCh <- err
break
}
log.Println("Lets continue reading messages from JetStream")
}
}
iter.Stop()
}

func JetStreamWrite(ctx context.Context, js jetstream.JetStream, chunks chan []byte, errCh chan error, done chan struct{}) {
defer log.Println("done writing to JetStream")
msg := []byte{}
for {
select {
case <-done:
return
case chunk := <-chunks:
if len(chunk) == 0 {
_, err := js.Publish(ctx, "rust", msg)
if err != nil {
errCh <- err
}
log.Println("Successfully published to JetStream")
return
}
msg = append(msg, chunk...)
}
}
}

func LLMStream(ctx context.Context, js jetstream.JetStream, llm *ollama.LLM, prompt string, errCh chan error, done chan struct{}) {
defer log.Println("done streaming LLM")

chunks := make(chan []byte)
go JetStreamWrite(ctx, js, chunks, errCh, done)

_, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt,
llms.WithStreamingFunc(func(_ context.Context, chunk []byte) error {
select {
case <-done:
return nil
case chunks <- chunk:
return nil
}
}))
if err != nil {
errCh <- err
}
}

func main() {
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}

nc, err := nats.Connect(url)
if err != nil {
log.Fatalf("failed connecting to nats: %v", err)
}
// nolint:errcheck
defer nc.Drain()

js, err := jetstream.New(nc)
if err != nil {
log.Fatalf("failed creating a new jetstream: %v", err)
}

ctx := context.Background()
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "banter",
Subjects: []string{"go", "rust"},
})
if err != nil {
if !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) {
log.Fatalf("failed creating stream: %v", err)
}
var jsErr error
stream, jsErr = js.Stream(ctx, "banter")
if jsErr != nil {
log.Fatalf("failed getting JS handle: %v", jsErr)
}
}
log.Println("connected to stream")

cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "go",
FilterSubject: "go",
})
if err != nil {
log.Fatalf("failed creating go consumer: %v", err)
}
log.Println("created stream consumer")

llm, err := ollama.New(ollama.WithModel("llama2"))
if err != nil {
log.Fatal("Failed creating LLM client: ", err)
}

errCh := make(chan error, 1)
done := make(chan struct{})

go func() {
if err := <-errCh; err != nil {
log.Println("error streaming: ", err)
close(done)
}
}()

GameOver:
for {
select {
case <-done:
break GameOver
default:
reader := bufio.NewReader(os.Stdin)
prompt, err := reader.ReadString('\n')
if err != nil {
log.Fatal("Failed reading seed prompt: ", err)
}

ctx := context.Background()
go LLMStream(ctx, js, llm, prompt, errCh, done)
go JetStreamRead(ctx, cons, errCh, done)

log.Println("Let's continue talking")
}
}
}
14 changes: 14 additions & 0 deletions rustbot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "rustbot"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-nats = { version = "0.34", features = ["service"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3.21"
serde_json = "1.0.82"
serde = "1.0.139"
rand = "0.8"
43 changes: 43 additions & 0 deletions rustbot/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use async_nats::jetstream::{self, consumer::pull, stream::Config};
use futures::StreamExt;
use tokio;

#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let nats_url =
std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());

let client = async_nats::connect(nats_url).await?;

let js = jetstream::new(client);

let stream = js
.get_or_create_stream(Config {
name: "banter".to_string(),
..Default::default()
})
.await?;
println!("connected to stream");

let cons = stream
.create_consumer(pull::Config {
durable_name: Some("rust".to_string()),
filter_subject: "rust".to_string(),
..Default::default()
})
.await?;
println!("created stream consumer");

let mut messages = cons.messages().await?;
while let Some(Ok(message)) = messages.next().await {
println!(
"Received a JetStream message {:?}",
message.payload.to_owned()
);
message.ack().await?;
js.publish("go".to_string(), "Thx for the message Golang".into())
.await?;
}

Ok(())
}
Loading