From 882708cb7239a5ca49f21992448dc0440188c56d Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 10 Apr 2024 18:38:16 +0100 Subject: [PATCH 1/2] First commit Signed-off-by: Milos Gajdos --- .github/workflows/ci.yaml | 31 ++++++- .gitignore | 21 +++++ README.md | 25 +++++- go.mod | 21 ++++- go.sum | 30 +++++++ gobot/main.go | 164 ++++++++++++++++++++++++++++++++++++++ rustbot/Cargo.toml | 14 ++++ rustbot/src/main.rs | 43 ++++++++++ 8 files changed, 341 insertions(+), 8 deletions(-) create mode 100644 go.sum create mode 100644 gobot/main.go create mode 100644 rustbot/Cargo.toml create mode 100644 rustbot/src/main.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f76dae2..8f1c47c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -8,8 +8,8 @@ on: jobs: - ci: - name: Run CI + gobot: + name: Run gobot CI runs-on: ${{ matrix.os }} strategy: fail-fast: false @@ -85,3 +85,30 @@ jobs: uses: golangci/golangci-lint-action@v3 with: version: v1.56 + + rustbot: + name: Run rustbot CI + runs-on: ubuntu-latest + env: + CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }} + + steps: + - name: check out code + uses: actions/checkout@v4 + + - name: cargo version and environment + id: vars + run: | + printf "Using cargo at: $(which cargo)\n" + printf "cargo version: $(cargo version)\n" + + - name: setup rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: run build + run: | + cargo check --manifest-path rustbot/Cargo.toml --verbose + cargo build --manifest-path rustbot/Cargo.toml --release --verbose + + - name: run tests + run: cargo test --manifest-path rustbot/Cargo.toml --verbose diff --git a/.gitignore b/.gitignore index 66fd13c..7bd0016 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,24 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +# Generated by Cargo +# will have compiled files and executables +**/debug/ +**/target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +**/Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb +*.mp3 + + +# Added by cargo + +/target diff --git a/README.md b/README.md index 74e6321..d9e8b96 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,24 @@ -# go-repo-template +# bot-banter -[![Build Status](https://github.com/milosgajdos/go-repo-template/workflows/CI/badge.svg)](https://github.com/milosgajdos/go-repo-template/actions?query=workflow%3ACI) -[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/milosgajdos/go-repo-template) +[![Build Status](https://github.com/milosgajdos/bot-banter/workflows/CI/badge.svg)](https://github.com/milosgajdos/bot-banter/actions?query=workflow%3ACI) +[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/milosgajdos/bot-banter) [![License: Apache-2.0](https://img.shields.io/badge/License-Apache--2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -This is a `Go` project repo template following the [official guide](https://docs.github.com/en/free-pro-team@latest/github/creating-cloning-and-archiving-repositories/creating-a-template-repository). + +# HOWTO + +Run NATS +``` +nix-shell -p nats-server +nats-server -js +``` + +Start a `gobot`: +``` +go run ./gobot/... +``` + +Start a `rustbot`: +``` +cargo run --manifest-path rustbot/Cargo.toml +``` diff --git a/go.mod b/go.mod index e703591..c96a7c4 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,20 @@ -module github.com/milosgajdos/go-repo-template +module nats-jet -go 1.20 +go 1.22.2 + +require ( + github.com/nats-io/nats.go v1.34.1 + github.com/tmc/langchaingo v0.1.8 +) + +require ( + github.com/dlclark/regexp2 v1.10.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pkoukk/tiktoken-go v0.1.6 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a17f437 --- /dev/null +++ b/go.sum @@ -0,0 +1,30 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= +github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= +github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tmc/langchaingo v0.1.8 h1:nrImgh0aWdu3stJTHz80N60WGwPWY8HXCK10gQny7bA= +github.com/tmc/langchaingo v0.1.8/go.mod h1:iNBfS9e6jxBKsJSPWnlqNhoVWgdA3D1g5cdFJjbIZNQ= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gobot/main.go b/gobot/main.go new file mode 100644 index 0000000..6fde79a --- /dev/null +++ b/gobot/main.go @@ -0,0 +1,164 @@ +package main + +import ( + "bufio" + "context" + "errors" + "log" + "os" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/tmc/langchaingo/llms" + "github.com/tmc/langchaingo/llms/ollama" +) + +func JetStreamRead(ctx context.Context, cons jetstream.Consumer, errCh chan error, done chan struct{}) { + defer log.Println("done reading from JetStream") + iter, err := cons.Messages() + if err != nil { + errCh <- err + return + } +ReadStreamDone: + for { + select { + case <-done: + break ReadStreamDone + default: + msg, err := iter.Next() + if err != nil { + errCh <- err + break + } + log.Printf("Received a JetStream message: %s", string(msg.Data())) + if err := msg.Ack(); err != nil { + errCh <- err + break + } + log.Println("Lets continue reading messages from JetStream") + } + } + iter.Stop() +} + +func JetStreamWrite(ctx context.Context, js jetstream.JetStream, chunks chan []byte, errCh chan error, done chan struct{}) { + defer log.Println("done writing to JetStream") + msg := []byte{} + for { + select { + case <-done: + return + case chunk := <-chunks: + if len(chunk) == 0 { + _, err := js.Publish(ctx, "rust", msg) + if err != nil { + errCh <- err + } + log.Println("Successfully published to JetStream") + return + } + msg = append(msg, chunk...) + } + } +} + +func LLMStream(ctx context.Context, js jetstream.JetStream, llm *ollama.LLM, prompt string, errCh chan error, done chan struct{}) { + defer log.Println("done streaming LLM") + + chunks := make(chan []byte) + go JetStreamWrite(ctx, js, chunks, errCh, done) + + _, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt, + llms.WithStreamingFunc(func(ctx context.Context, chunk []byte) error { + select { + case <-done: + return nil + case chunks <- chunk: + return nil + } + })) + if err != nil { + errCh <- err + } +} + +func main() { + url := os.Getenv("NATS_URL") + if url == "" { + url = nats.DefaultURL + } + + nc, err := nats.Connect(url) + if err != nil { + log.Fatalf("failed connecting to nats: %v", err) + } + // nolint:errcheck + defer nc.Drain() + + js, err := jetstream.New(nc) + if err != nil { + log.Fatalf("failed creating a new jetstream: %v", err) + } + + ctx := context.Background() + stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "banter", + Subjects: []string{"go", "rust"}, + }) + if err != nil { + if !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) { + log.Fatalf("failed creating stream: %v", err) + } + var jsErr error + stream, jsErr = js.Stream(ctx, "banter") + if jsErr != nil { + log.Fatalf("failed getting JS handle: %v", jsErr) + } + } + log.Println("connected to stream") + + cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "go", + FilterSubject: "go", + }) + if err != nil { + log.Fatalf("failed creating go consumer: %v", err) + } + log.Println("created stream consumer") + + llm, err := ollama.New(ollama.WithModel("llama2")) + if err != nil { + log.Fatal("Failed creating LLM client: ", err) + } + + errCh := make(chan error, 1) + done := make(chan struct{}) + + go func() { + if err := <-errCh; err != nil { + log.Println("error streaming: ", err) + close(done) + } + }() + +GameOver: + for { + select { + case <-done: + break GameOver + default: + reader := bufio.NewReader(os.Stdin) + prompt, err := reader.ReadString('\n') + if err != nil { + log.Fatal("Failed reading seed prompt: ", err) + } + + ctx := context.Background() + go LLMStream(ctx, js, llm, prompt, errCh, done) + go JetStreamRead(ctx, cons, errCh, done) + + log.Println("Let's continue talking") + } + } +} diff --git a/rustbot/Cargo.toml b/rustbot/Cargo.toml new file mode 100644 index 0000000..852846d --- /dev/null +++ b/rustbot/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "rustbot" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-nats = { version = "0.34", features = ["service"] } +tokio = { version = "1", features = ["full"] } +futures = "0.3.21" +serde_json = "1.0.82" +serde = "1.0.139" +rand = "0.8" diff --git a/rustbot/src/main.rs b/rustbot/src/main.rs new file mode 100644 index 0000000..a0d340a --- /dev/null +++ b/rustbot/src/main.rs @@ -0,0 +1,43 @@ +use async_nats::jetstream::{self, consumer::pull, stream::Config}; +use futures::StreamExt; +use tokio; + +#[tokio::main] +async fn main() -> Result<(), async_nats::Error> { + let nats_url = + std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); + + let client = async_nats::connect(nats_url).await?; + + let js = jetstream::new(client); + + let stream = js + .get_or_create_stream(Config { + name: "banter".to_string(), + ..Default::default() + }) + .await?; + println!("connected to stream"); + + let cons = stream + .create_consumer(pull::Config { + durable_name: Some("rust".to_string()), + filter_subject: "rust".to_string(), + ..Default::default() + }) + .await?; + println!("created stream consumer"); + + let mut messages = cons.messages().await?; + while let Some(Ok(message)) = messages.next().await { + println!( + "Received a JetStream message {:?}", + message.payload.to_owned() + ); + message.ack().await?; + js.publish("go".to_string(), "Thx for the message Golang".into()) + .await?; + } + + Ok(()) +} From f0c7eab6939e2bee044f6aa186e6c99a31e75f6c Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 10 Apr 2024 18:42:31 +0100 Subject: [PATCH 2/2] Fix Go linter Signed-off-by: Milos Gajdos --- .golangci.yml | 3 +++ gobot/main.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 5fa9ef6..03962b0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -13,6 +13,9 @@ linters: - goimports - prealloc +issues: + exclude-dirs: + - vendor run: deadline: 2m skip-dirs: diff --git a/gobot/main.go b/gobot/main.go index 6fde79a..d9bce29 100644 --- a/gobot/main.go +++ b/gobot/main.go @@ -13,7 +13,7 @@ import ( "github.com/tmc/langchaingo/llms/ollama" ) -func JetStreamRead(ctx context.Context, cons jetstream.Consumer, errCh chan error, done chan struct{}) { +func JetStreamRead(_ context.Context, cons jetstream.Consumer, errCh chan error, done chan struct{}) { defer log.Println("done reading from JetStream") iter, err := cons.Messages() if err != nil { @@ -70,7 +70,7 @@ func LLMStream(ctx context.Context, js jetstream.JetStream, llm *ollama.LLM, pro go JetStreamWrite(ctx, js, chunks, errCh, done) _, err := llms.GenerateFromSinglePrompt(ctx, llm, prompt, - llms.WithStreamingFunc(func(ctx context.Context, chunk []byte) error { + llms.WithStreamingFunc(func(_ context.Context, chunk []byte) error { select { case <-done: return nil