Skip to content

Commit

Permalink
make session a query parameter (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrove-oss authored Mar 22, 2022
1 parent f854211 commit 187167a
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 89 deletions.
13 changes: 7 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ jobs:
script:
- ./scripts/docker-compose-start.sh || travis_terminate 1
- make docker-run-containerized-python-examples
- name: local-released
script:
- ./scripts/docker-compose-start.sh || travis_terminate 1
- make cli || travis_terminate 1
- ./ci/testJSLocal.sh
- ./ci/testJavaLocal.sh
# Disabled due to backwards incompatible sidecar API changes
# - name: local-released
# script:
# - ./scripts/docker-compose-start.sh || travis_terminate 1
# - make cli || travis_terminate 1
# - ./ci/testJSLocal.sh
# - ./ci/testJavaLocal.sh
- name: in-cluster
script:
- ./scripts/kind-start.sh || travis_terminate 1
Expand Down
16 changes: 9 additions & 7 deletions core/internal/runtime/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ func handlerService(ctx context.Context, target rpc.Service, value []byte) ([]by

func handlerActor(ctx context.Context, target rpc.Session, instance *rpc.SessionInstance, requestID string, value []byte) (*rpc.Destination, []byte, error) {
actor := Actor{Type: target.Name, ID: target.ID}
session := target.Flow + ":" + requestID
var reply []byte = nil
var err error = nil
var msg map[string]string
Expand Down Expand Up @@ -336,7 +337,7 @@ func handlerActor(ctx context.Context, target rpc.Session, instance *rpc.Session

var dest *rpc.Destination = nil
if !instance.Activated {
reply, err = activate(ctx, actor, msg["command"] == "call", msg["path"])
reply, err = activate(ctx, actor, session, msg)
if reply != nil {
// activate returned an application-level error, do not retry
err = nil
Expand All @@ -348,7 +349,7 @@ func handlerActor(ctx context.Context, target rpc.Session, instance *rpc.Session
if instance.Activated {
// invoke actor method
metricLabel := actor.Type + ":" + msg["path"] // compute metric label before we augment the path with id+flow
msg["path"] = actorRuntimeRoutePrefix + actor.Type + "/" + actor.ID + "/" + target.Flow + ":" + requestID + msg["path"]
msg["path"] = actorRuntimeRoutePrefix + actor.Type + "/" + actor.ID + msg["path"] + "?session=" + session
msg["content-type"] = "application/kar+json"
msg["method"] = "POST"

Expand Down Expand Up @@ -527,20 +528,21 @@ func getActorInformation(ctx context.Context, msg map[string]string) ([]byte, er
}

// activate an actor
func activate(ctx context.Context, actor Actor, isCall bool, causingMethod string) ([]byte, error) {
reply, err := invoke(ctx, "GET", map[string]string{"path": actorRuntimeRoutePrefix + actor.Type + "/" + actor.ID}, actor.Type+":activate")
func activate(ctx context.Context, actor Actor, session string, causingMsg map[string]string) ([]byte, error) {
activatePath := actorRuntimeRoutePrefix + actor.Type + "/" + actor.ID + "?session=" + session
reply, err := invoke(ctx, "GET", map[string]string{"path": activatePath}, actor.Type+":activate")
if err != nil {
if err != ctx.Err() {
logger.Debug("activate failed to invoke %s: %v", actorRuntimeRoutePrefix+actor.Type+"/"+actor.ID, err)
}
return nil, err
}
if reply.StatusCode >= http.StatusBadRequest {
if isCall {
logger.Debug("activate %v returned status %v with body %s, aborting call %s", actor, reply.StatusCode, reply.Payload, causingMethod)
if causingMsg["command"] == "call" {
logger.Debug("activate %v returned status %v with body %s, aborting call %s", actor, reply.StatusCode, reply.Payload, causingMsg["path"])
} else {
// Log at error level becasue there is no one waiting on the method reponse to notice the failure.
logger.Error("activate %v returned status %v with body %s, aborting tell %s", actor, reply.StatusCode, reply.Payload, causingMethod)
logger.Error("activate %v returned status %v with body %s, aborting tell %s", actor, reply.StatusCode, reply.Payload, causingMsg["path"])
}
return json.Marshal(reply)
}
Expand Down
16 changes: 5 additions & 11 deletions core/internal/runtime/routes-docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func dummy1() {}
//
func dummy2() {}

// swagger:route POST /impl/v1/actor/{actorType}/{actorId}/{session}/{methodName} actor-runtime idImplActorPost
// swagger:route POST /impl/v1/actor/{actorType}/{actorId}/{methodName} actor-runtime idImplActorPost
//
// actor invocation
//
Expand Down Expand Up @@ -288,19 +288,13 @@ type methodParam struct {
}

// swagger:parameters idActorCall
// swagger:parameters idImplActorGet
// swagger:parameters idImplActorPost
type sessionParam struct {
// Optionally specific the session to use when performing the call. Enables re-entrancy for nested actor calls.
// The session is an opaque string used by the KAR runtime to enable reentrancy
// for nested actor calls and to track caller-callee relationships for failure recovery
// in:query
// required:false
// swagger:strfmt uuid
Session string `json:"session"`
}

// swagger:parameters idImplActorPost
type sessionPathParam struct {
// The session to use for the actor method invocation.
// in:path
// swagger:strfmt uuid
Session string `json:"session"`
}

Expand Down
18 changes: 11 additions & 7 deletions docs/api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
"name": "actorId",
"in": "path",
"required": true
},
{
"type": "string",
"x-go-name": "Session",
"description": "The session is an opaque string used by the KAR runtime to enable reentrancy\nfor nested actor calls and to track caller-callee relationships for failure recovery",
"name": "session",
"in": "query"
}
],
"responses": {
Expand Down Expand Up @@ -126,7 +133,7 @@
}
}
},
"/impl/v1/actor/{actorType}/{actorId}/{session}/{methodName}": {
"/impl/v1/actor/{actorType}/{actorId}/{methodName}": {
"post": {
"description": "### Invoke an actor method of the specified actor instance\n\nInvokes the actor method on the actor instance within the session specified in the path.\nThe body of the request will contain the actual paramters on which to invoke the method.",
"consumes": [
Expand Down Expand Up @@ -171,12 +178,10 @@
},
{
"type": "string",
"format": "uuid",
"x-go-name": "Session",
"description": "The session to use for the actor method invocation.",
"description": "The session is an opaque string used by the KAR runtime to enable reentrancy\nfor nested actor calls and to track caller-callee relationships for failure recovery",
"name": "session",
"in": "path",
"required": true
"in": "query"
},
{
"example": "[3, 'hello', { msg: 'Greetings' }]",
Expand Down Expand Up @@ -285,9 +290,8 @@
},
{
"type": "string",
"format": "uuid",
"x-go-name": "Session",
"description": "Optionally specific the session to use when performing the call. Enables re-entrancy for nested actor calls.",
"description": "The session is an opaque string used by the KAR runtime to enable reentrancy\nfor nested actor calls and to track caller-callee relationships for failure recovery",
"name": "session",
"in": "query"
},
Expand Down
23 changes: 15 additions & 8 deletions docs/api/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ paths:
required: true
type: string
x-go-name: ActorID
- description: |-
The session is an opaque string used by the KAR runtime to enable reentrancy
for nested actor calls and to track caller-callee relationships for failure recovery
in: query
name: session
type: string
x-go-name: Session
responses:
"200":
$ref: '#/responses/response200'
Expand All @@ -328,7 +335,7 @@ paths:
summary: actor allocation
tags:
- actor-runtime
/impl/v1/actor/{actorType}/{actorId}/{session}/{methodName}:
/impl/v1/actor/{actorType}/{actorId}/{methodName}:
post:
consumes:
- application/kar+json
Expand Down Expand Up @@ -358,11 +365,11 @@ paths:
required: true
type: string
x-go-name: MethodName
- description: The session to use for the actor method invocation.
format: uuid
in: path
- description: |-
The session is an opaque string used by the KAR runtime to enable reentrancy
for nested actor calls and to track caller-callee relationships for failure recovery
in: query
name: session
required: true
type: string
x-go-name: Session
- description: A possibly empty array containing the arguments with which to
Expand Down Expand Up @@ -454,9 +461,9 @@ paths:
required: true
type: string
x-go-name: MethodName
- description: Optionally specific the session to use when performing the call. Enables
re-entrancy for nested actor calls.
format: uuid
- description: |-
The session is an opaque string used by the KAR runtime to enable reentrancy
for nested actor calls and to track caller-callee relationships for failure recovery
in: query
name: session
type: string
Expand Down
4 changes: 2 additions & 2 deletions examples/actors-dp-js/tester.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ async function main () {
const cafe = actor.proxy('Cafe', 'Cafe+de+Flore')

console.log('Serving a meal:')
const table = await actor.call(cafe, 'seatTable', 20, 5)
const table = await actor.rootCall(cafe, 'seatTable', 20, 5)

let occupancy = 1
while (occupancy > 0 & !failure) {
occupancy = await actor.call(cafe, 'occupancy', table)
occupancy = await actor.rootCall(cafe, 'occupancy', table)
console.log(`Table occupancy is ${occupancy}`)
await sleep(2000)
countdown = countdown - 1
Expand Down
10 changes: 5 additions & 5 deletions examples/actors-ykt/ykt-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function prettyPrintHistogram (header, bucketSizeInMS, histogram) {
async function summaryReport (division) {
const summary = { onboarding: 0, home: 0, commuting: 0, working: 0, meeting: 0, coffee: 0, lunch: 0 }
for (const site of division) {
const sr = await actor.call(site.proxy, 'siteReport')
const sr = await actor.rootCall(site.proxy, 'siteReport')
summary.onboarding += sr.onboarding || 0
summary.home += sr.home || 0
summary.commuting += sr.commuting || 0
Expand Down Expand Up @@ -79,21 +79,21 @@ async function main () {
}

for (const site of researchDivision) {
await actor.call(site.proxy, 'resetDelayStats')
await actor.rootCall(site.proxy, 'resetDelayStats')
await actor.reminders.schedule(site.proxy, 'siteReport', { id: 'clientSiteReport', targetTime: new Date(Date.now() + 1000), period: '1s' })
await actor.call(ibm, 'hire', Object.assign({ site: site.proxy.kar.id }, site.params))
await actor.rootCall(ibm, 'hire', Object.assign({ site: site.proxy.kar.id }, site.params))
}

while (true) {
await sleep(5000)
const employees = await actor.call(ibm, 'count')
const employees = await actor.rootCall(ibm, 'count')
console.log(`Num employees is ${employees}`)
if (employees === 0) {
const summary = { reminderDelays: [], tellLatencies: [] }
let bucketSizeInMS
for (const site of researchDivision) {
console.log(`Valiadating ${site.proxy.kar.id}`)
const sr = await actor.call(site.proxy, 'siteReport')
const sr = await actor.rootCall(site.proxy, 'siteReport')
if (sr.siteEmployees !== 0) {
console.log(`FAILURE: ${sr.siteEmployees} stranded employees at ${site.proxy.kar.id}`)
failure = true
Expand Down
2 changes: 1 addition & 1 deletion examples/unit-tests/actor-loop.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async function main () {
let x = 0
const a = actor.proxy('Foo', 'myInstance')
for (let i = 0; i < 5000; i++) {
x = await actor.call(a, 'incr', x)
x = await actor.rootCall(a, 'incr', x)
console.log(i, '->', x)
}
console.log('=>', x)
Expand Down
18 changes: 9 additions & 9 deletions examples/unit-tests/test-harness.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,15 @@ async function actorTests () {

// external synchronous invocation of an actor method
for (let i = 0; i < 25; i++) {
const x = await actor.call(actor.proxy('Foo', 'anotherInstance'), 'incrQuiet', i)
const x = await actor.rootCall(actor.proxy('Foo', 'anotherInstance'), 'incrQuiet', i)
if (x !== i + 1) {
console.log(`Failed! incr(${i}) returned ${x}`)
failure = true
}
}

// synchronous invocation via the actor
const v6 = await actor.call(a, 'incr', 42)
const v6 = await actor.rootCall(a, 'incr', 42)
if (v6 !== 43) {
console.log(`Failed: unexpected result from incr ${v6}`)
failure = true
Expand All @@ -216,7 +216,7 @@ async function actorTests () {
}

// getter
const v7 = await actor.call(a, 'field')
const v7 = await actor.rootCall(a, 'field')
if (v7 !== 42) {
console.log(`Failed: getter of 'field' returned ${v7}`)
failure = true
Expand All @@ -225,7 +225,7 @@ async function actorTests () {
console.log('Testing actor invocation error handling')
// error in synchronous invocation
try {
console.log(await actor.call(a, 'fail', 'error message 123'))
console.log(await actor.rootCall(a, 'fail', 'error message 123'))
console.log('Failed to raise expected error')
failure = true
} catch (err) {
Expand All @@ -234,15 +234,15 @@ async function actorTests () {

// undefined method
try {
console.log(await actor.call(a, 'missing', 'error message 123'))
console.log(await actor.rootCall(a, 'missing', 'error message 123'))
console.log('Failed. No error raised invoking missing method')
failure = true
} catch (err) {
if (verbose) console.log('Caught expected error: ', err.message)
}

// reentrancy
const v9 = await actor.call(a, 'reenter', 42)
const v9 = await actor.rootCall(a, 'reenter', 42)
if (v9 !== 43) {
console.log(`Failed: unexpected result from reenter ${v9}`)
failure = true
Expand All @@ -266,15 +266,15 @@ async function pubSubTests () {

await events.createTopic(topic)

const v1 = await actor.call(a, 'pubsub', topic)
const v1 = await actor.rootCall(a, 'pubsub', topic)
if (v1 !== 'OK') {
console.log('Failed: pubsub')
failure = true
}

let i
for (i = 30; i > 0; i--) { // poll
const v2 = await actor.call(a, 'check', topic)
const v2 = await actor.rootCall(a, 'check', topic)
if (v2 === true) break
await new Promise(resolve => setTimeout(resolve, 500)) // wait
}
Expand All @@ -300,7 +300,7 @@ async function testTermination (failure) {
}

async function main () {
var failure = false
let failure = false

console.log('*** Service Tests ***')
failure |= await serviceTests()
Expand Down
Loading

0 comments on commit 187167a

Please sign in to comment.