diff --git a/ww/pipe_line.c b/ww/pipe_line.c index 7125e10..c4597ac 100644 --- a/ww/pipe_line.c +++ b/ww/pipe_line.c @@ -5,20 +5,11 @@ #include "shiftbuffer.h" #include "tunnel.h" -struct pipe_line_s +typedef struct pipe_line_cstate_s { - void *memptr; - tunnel_t *self; - line_t *left_line; - line_t *right_line; - atomic_int refc; - atomic_bool closed; - uint8_t left_tid; - uint8_t right_tid; /* 8-bit pad */ - PipeLineFlowRoutine local_up_stream; - PipeLineFlowRoutine local_down_stream; - -} ATTR_ALIGNED_LINE_CACHE; + atomic_int refc; + atomic_bool closed; +} pipe_line_cstate_t; struct msg_event { @@ -45,15 +36,15 @@ void destroyPipeLineMsgPoolHandle(struct generic_pool_s *pool, pool_item_t *item static void lock(pipe_line_t *pl) { int old_refc = atomic_fetch_add_explicit(&pl->refc, 1, memory_order_relaxed); -// #ifndef RELEASE -// if (0 >= old_refc) -// { -// // this should not happen, otherwise we must change memory order -// // but i think its ok because threads synchronize around the mutex in eventloop -// LOGF("PipeLine: thread-safety done incorrectly lock()"); -// exit(1); -// } -// #endif + // #ifndef RELEASE + // if (0 >= old_refc) + // { + // // this should not happen, otherwise we must change memory order + // // but i think its ok because threads synchronize around the mutex in eventloop + // LOGF("PipeLine: thread-safety done incorrectly lock()"); + // exit(1); + // } + // #endif (void) old_refc; } @@ -62,15 +53,15 @@ static void unlock(pipe_line_t *pl) int old_refc = atomic_fetch_add_explicit(&pl->refc, -1, memory_order_relaxed); if (old_refc == 1) { -// #ifndef RELEASE -// if (! atomic_load_explicit(&(pl->closed), memory_order_relaxed)) -// { -// // this should not happen, otherwise we must change memory order -// // but i think its ok because threads synchronize around the mutex in eventloop -// LOGF("PipeLine: thread-safety done incorrectly unlock()"); -// exit(1); -// } -// #endif + // #ifndef RELEASE + // if (! atomic_load_explicit(&(pl->closed), memory_order_relaxed)) + // { + // // this should not happen, otherwise we must change memory order + // // but i think its ok because threads synchronize around the mutex in eventloop + // LOGF("PipeLine: thread-safety done incorrectly unlock()"); + // exit(1); + // } + // #endif globalFree((void *) pl->memptr); // NOLINT } } @@ -104,255 +95,342 @@ static void sendMessage(pipe_line_t *pl, MsgTargetFunction fn, void *arg, uint8_ hloop_post_event(getWorkerLoop(tid_to), &ev); } -static void writeBufferToLeftSide(pipe_line_t *pl, void *arg) +static void localPipeUpStreamInit(pipe_line_t *pl, void *arg) { - shift_buffer_t *buf = arg; - if (pl->left_line == NULL) - { - reuseBuffer(getWorkerBufferPool(pl->left_tid), buf); - return; - } - context_t *ctx = newContext(pl->left_line); - ctx->payload = buf; - pl->local_down_stream(pl->self, ctx, pl); + assert(self->up != NULL); + self->up->fnInitU(self->up, line); } -static void writeBufferToRightSide(pipe_line_t *pl, void *arg) +static void localPipeUpStreamEst(pipe_line_t *pl, void *arg) { - shift_buffer_t *buf = arg; - if (pl->right_line == NULL) - { - reuseBuffer(getWorkerBufferPool(pl->right_tid), buf); - return; - } - context_t *ctx = newContext(pl->right_line); - ctx->payload = buf; - pl->local_up_stream(pl->self, ctx, pl); + assert(self->up != NULL); + self->up->fnEstU(self->up, line); } -static void finishLeftSide(pipe_line_t *pl, void *arg) +static void localPipeUpStreamFin(pipe_line_t *pl, void *arg) { - (void) arg; - - if (pl->left_line == NULL) - { - return; - } - context_t *fctx = newFinContext(pl->left_line); - doneLineUpSide(pl->left_line); - pl->left_line = NULL; - pl->local_down_stream(pl->self, fctx, pl); - unlock(pl); + assert(self->up != NULL); + self->up->fnFinU(self->up, line); } -static void finishRightSide(pipe_line_t *pl, void *arg) +static void localPipeUpStreamPayload(pipe_line_t *pl, void *arg) { - (void) arg; - if (pl->right_line == NULL) - { - return; - } - context_t *fctx = newFinContext(pl->right_line); - doneLineDownSide(pl->right_line); - destroyLine(pl->right_line); - pl->right_line = NULL; - pl->local_up_stream(pl->self, fctx, pl); - unlock(pl); + assert(self->up != NULL); + self->up->fnPayloadU(self->up, line, payload); } -static void pauseLeftLine(pipe_line_t *pl, void *arg) +static void localPipeUpStreamPause(pipe_line_t *pl, void *arg) { - (void) arg; - if (pl->left_line == NULL) - { - return; - } - pauseLineDownSide(pl->left_line); + assert(self->up != NULL); + self->up->fnPauseU(self->up, line); } -static void pauseRightLine(pipe_line_t *pl, void *arg) +static void localPipeUpStreamResume(pipe_line_t *pl, void *arg) { - (void) arg; - if (pl->right_line == NULL) - { - return; - } - pauseLineUpSide(pl->right_line); + assert(self->up != NULL); + self->up->fnResumeU(self->up, line); } -static void resumeLeftLine(pipe_line_t *pl, void *arg) +void pipeUpStreamInit(tunnel_t *self, line_t *line) { - (void) arg; - if (pl->left_line == NULL) - { - return; - } - resumeLineDownSide(pl->left_line); + assert(self->up != NULL); + self->up->fnInitU(self->up, line); } -static void resumeRightLine(pipe_line_t *pl, void *arg) +void pipeUpStreamEst(tunnel_t *self, line_t *line) { - (void) arg; - if (pl->right_line == NULL) - { - return; - } - resumeLineUpSide(pl->right_line); + assert(self->up != NULL); + self->up->fnEstU(self->up, line); } -void pipeOnUpLinePaused(void *state) +void pipeUpStreamFin(tunnel_t *self, line_t *line) { - pipe_line_t *pl = state; - if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) - { - return; - } - sendMessage(pl, pauseLeftLine, NULL, pl->right_tid, pl->left_tid); + assert(self->up != NULL); + self->up->fnFinU(self->up, line); } -void pipeOnUpLineResumed(void *state) +void pipeUpStreamPayload(tunnel_t *self, line_t *line, shift_buffer_t *payload) { - pipe_line_t *pl = state; - if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) - { - return; - } - sendMessage(pl, resumeLeftLine, NULL, pl->right_tid, pl->left_tid); + assert(self->up != NULL); + self->up->fnPayloadU(self->up, line, payload); } -void pipeOnDownLinePaused(void *state) +void pipeUpStreamPause(tunnel_t *self, line_t *line) { - pipe_line_t *pl = state; - if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) - { - return; - } - sendMessage(pl, pauseRightLine, NULL, pl->left_tid, pl->right_tid); + assert(self->up != NULL); + self->up->fnPauseU(self->up, line); } -void pipeOnDownLineResumed(void *state) +void pipeUpStreamResume(tunnel_t *self, line_t *line) { - pipe_line_t *pl = state; - if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) - { - return; - } - sendMessage(pl, resumeRightLine, NULL, pl->left_tid, pl->right_tid); + assert(self->up != NULL); + self->up->fnResumeU(self->up, line); } -bool pipeSendToUpStream(pipe_line_t *pl, context_t *c) -{ - if (WW_UNLIKELY(c->est)) - { - destroyContext(c); - return true; - } - // other flags are not supposed to come to pipe line - assert(c->fin || c->payload != NULL); - assert(pl->left_line); +// static void writeBufferToLeftSide(pipe_line_t *pl, void *arg) +// { +// shift_buffer_t *buf = arg; +// if (pl->left_line == NULL) +// { +// reuseBuffer(getWorkerBufferPool(pl->left_tid), buf); +// return; +// } +// context_t *ctx = newContext(pl->left_line); +// ctx->payload = buf; +// pl->local_down_stream(pl->self, ctx, pl); +// } + +// static void writeBufferToRightSide(pipe_line_t *pl, void *arg) +// { +// shift_buffer_t *buf = arg; +// if (pl->right_line == NULL) +// { +// reuseBuffer(getWorkerBufferPool(pl->right_tid), buf); +// return; +// } +// context_t *ctx = newContext(pl->right_line); +// ctx->payload = buf; +// pl->local_up_stream(pl->self, ctx, pl); +// } - if (c->fin) - { - doneLineUpSide(pl->left_line); - pl->left_line = NULL; - - bool expected = false; - - if (atomic_compare_exchange_strong_explicit(&(pl->closed), &expected, true, memory_order_relaxed, - memory_order_relaxed)) - { - // we managed to close the channel - destroyContext(c); - sendMessage(pl, finishRightSide, NULL, pl->left_tid, pl->right_tid); - return true; - } - // other line managed to close first and also queued us the fin packet - return false; - } +// static void finishLeftSide(pipe_line_t *pl, void *arg) +// { +// (void) arg; - if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) - { - return false; - } - assert(c->payload != NULL); +// if (pl->left_line == NULL) +// { +// return; +// } +// context_t *fctx = newFinContext(pl->left_line); +// doneLineUpSide(pl->left_line); +// pl->left_line = NULL; +// pl->local_down_stream(pl->self, fctx, pl); +// unlock(pl); +// } + +// static void finishRightSide(pipe_line_t *pl, void *arg) +// { +// (void) arg; +// if (pl->right_line == NULL) +// { +// return; +// } +// context_t *fctx = newFinContext(pl->right_line); +// doneLineDownSide(pl->right_line); +// destroyLine(pl->right_line); +// pl->right_line = NULL; +// pl->local_up_stream(pl->self, fctx, pl); +// unlock(pl); +// } + +// static void pauseLeftLine(pipe_line_t *pl, void *arg) +// { +// (void) arg; +// if (pl->left_line == NULL) +// { +// return; +// } +// pauseLineDownSide(pl->left_line); +// } +// static void pauseRightLine(pipe_line_t *pl, void *arg) +// { +// (void) arg; +// if (pl->right_line == NULL) +// { +// return; +// } +// pauseLineUpSide(pl->right_line); +// } - sendMessage(pl, writeBufferToRightSide, c->payload, pl->left_tid, pl->right_tid); - dropContexPayload(c); - destroyContext(c); +// static void resumeLeftLine(pipe_line_t *pl, void *arg) +// { +// (void) arg; +// if (pl->left_line == NULL) +// { +// return; +// } +// resumeLineDownSide(pl->left_line); +// } - return true; -} +// static void resumeRightLine(pipe_line_t *pl, void *arg) +// { +// (void) arg; +// if (pl->right_line == NULL) +// { +// return; +// } +// resumeLineUpSide(pl->right_line); +// } -bool pipeSendToDownStream(pipe_line_t *pl, context_t *c) -{ - // est context is ignored, only fin or data makes sense - if (WW_UNLIKELY(c->est)) - { - destroyContext(c); - return true; - } +// void pipeOnUpLinePaused(void *state) +// { +// pipe_line_t *pl = state; +// if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) +// { +// return; +// } +// sendMessage(pl, pauseLeftLine, NULL, pl->right_tid, pl->left_tid); +// } - // other flags are not supposed to come to pipe line - assert(c->fin || c->payload != NULL); - assert(pl->right_line); +// void pipeOnUpLineResumed(void *state) +// { +// pipe_line_t *pl = state; +// if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) +// { +// return; +// } +// sendMessage(pl, resumeLeftLine, NULL, pl->right_tid, pl->left_tid); +// } - if (c->fin) - { - doneLineDownSide(pl->right_line); - destroyLine(pl->right_line); - pl->right_line = NULL; - - bool expected = false; - - if (atomic_compare_exchange_strong_explicit(&(pl->closed), &expected, true, memory_order_relaxed, - memory_order_relaxed)) - { - // we managed to close the channel - destroyContext(c); - sendMessage(pl, finishLeftSide, NULL, pl->right_tid, pl->left_tid); - return true; - } - // other line managed to close first and also queued us the fin packet - return false; - } +// void pipeOnDownLinePaused(void *state) +// { +// pipe_line_t *pl = state; +// if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) +// { +// return; +// } +// sendMessage(pl, pauseRightLine, NULL, pl->left_tid, pl->right_tid); +// } - if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) - { - return false; - } - assert(c->payload != NULL); +// void pipeOnDownLineResumed(void *state) +// { +// pipe_line_t *pl = state; +// if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) +// { +// return; +// } +// sendMessage(pl, resumeRightLine, NULL, pl->left_tid, pl->right_tid); +// } + +// bool pipeSendToUpStream(pipe_line_t *pl, context_t *c) +// { +// if (WW_UNLIKELY(c->est)) +// { +// destroyContext(c); +// return true; +// } +// // other flags are not supposed to come to pipe line +// assert(c->fin || c->payload != NULL); +// assert(pl->left_line); +// if (c->fin) +// { +// doneLineUpSide(pl->left_line); +// pl->left_line = NULL; - sendMessage(pl, writeBufferToLeftSide, c->payload, pl->right_tid, pl->left_tid); - dropContexPayload(c); - destroyContext(c); +// bool expected = false; - return true; -} +// if (atomic_compare_exchange_strong_explicit(&(pl->closed), &expected, true, memory_order_relaxed, +// memory_order_relaxed)) +// { +// // we managed to close the channel +// destroyContext(c); +// sendMessage(pl, finishRightSide, NULL, pl->left_tid, pl->right_tid); +// return true; +// } +// // other line managed to close first and also queued us the fin packet +// return false; +// } + +// if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) +// { +// return false; +// } +// assert(c->payload != NULL); + +// sendMessage(pl, writeBufferToRightSide, c->payload, pl->left_tid, pl->right_tid); +// dropContexPayload(c); +// destroyContext(c); + +// return true; +// } + +// bool pipeSendToDownStream(pipe_line_t *pl, context_t *c) +// { +// // est context is ignored, only fin or data makes sense +// if (WW_UNLIKELY(c->est)) +// { +// destroyContext(c); +// return true; +// } -static void initRight(pipe_line_t *pl, void *arg) +// // other flags are not supposed to come to pipe line +// assert(c->fin || c->payload != NULL); +// assert(pl->right_line); + +// if (c->fin) +// { +// doneLineDownSide(pl->right_line); +// destroyLine(pl->right_line); +// pl->right_line = NULL; + +// bool expected = false; + +// if (atomic_compare_exchange_strong_explicit(&(pl->closed), &expected, true, memory_order_relaxed, +// memory_order_relaxed)) +// { +// // we managed to close the channel +// destroyContext(c); +// sendMessage(pl, finishLeftSide, NULL, pl->right_tid, pl->left_tid); +// return true; +// } +// // other line managed to close first and also queued us the fin packet +// return false; +// } + +// if (atomic_load_explicit(&pl->closed, memory_order_relaxed)) +// { +// return false; +// } +// assert(c->payload != NULL); + +// sendMessage(pl, writeBufferToLeftSide, c->payload, pl->right_tid, pl->left_tid); +// dropContexPayload(c); +// destroyContext(c); + +// return true; +// } + +// static void initRight(pipe_line_t *pl, void *arg) +// { +// (void) arg; +// pl->right_line = newLine(pl->right_tid); +// pl->right_line->dw_piped = true; +// setupLineDownSide(pl->right_line, pipeOnUpLinePaused, pl, pipeOnUpLineResumed); +// context_t *context = newInitContext(pl->right_line); +// pl->local_up_stream(pl->self, context, pl); +// } + +// static void initLeft(pipe_line_t *pl, void *arg) +// { +// (void) arg; +// pl->left_line->up_piped = true; + +// setupLineUpSide(pl->left_line, pipeOnDownLinePaused, pl, pipeOnDownLineResumed); +// } + +void pipeTo(tunnel_t *t, line_t *l, tid_t tid) { - (void) arg; - pl->right_line = newLine(pl->right_tid); - pl->right_line->dw_piped = true; - setupLineDownSide(pl->right_line, pipeOnUpLinePaused, pl, pipeOnUpLineResumed); - context_t *context = newInitContext(pl->right_line); - pl->local_up_stream(pl->self, context, pl); + assert(l->up_piped == false); + assert(l->tid != tid); + + tunnel_t *pt = ((uint8_t *) t) - (sizeof(tunnel_t) + t->tstate_size); + + l->up_piped = true; + sendMessage(pl, localPipeUpStreamInit, NULL, l->tid, tid); } -static void initLeft(pipe_line_t *pl, void *arg) +tunnel_t *newPipeTunnel(tunnel_t *t) { - (void) arg; - pl->left_line->up_piped = true; + size_t tstate_size = sizeof(tunnel_t) + t->tstate_size; + size_t cstate_size = sizeof(pipe_line_cstate_t) + t->cstate_size; + // dont forget cstate offset - setupLineUpSide(pl->left_line, pipeOnDownLinePaused, pl, pipeOnDownLineResumed); -} + tunnel_t *encapsulated_tunnel = newTunnel(state_size, cstate_size); -void newPipeLine(tunnel_t *self, line_t *left_line, uint8_t dest_tid, PipeLineFlowRoutine local_up_stream, - PipeLineFlowRoutine local_down_stream) + setTunnelState(encapsulated_tunnel, t); -{ assert(sizeof(struct pipe_line_s) <= kCpuLineCacheSize); int64_t memsize = (int64_t) sizeof(struct pipe_line_s); diff --git a/ww/pipe_line.h b/ww/pipe_line.h index 5d50fbd..597f91a 100644 --- a/ww/pipe_line.h +++ b/ww/pipe_line.h @@ -47,11 +47,20 @@ typedef struct pipe_line_s pipe_line_t; pool_item_t *allocPipeLineMsgPoolHandle(generic_pool_t *pool); void destroyPipeLineMsgPoolHandle(generic_pool_t *pool, pool_item_t *item); -void pipeOnUpLinePaused(void *state); -void pipeOnUpLineResumed(void *state); -void pipeOnDownLineResumed(void *state); -void pipeOnDownLinePaused(void *state); -bool pipeSendToUpStream(pipe_line_t *pl, context_t *c); -bool pipeSendToDownStream(pipe_line_t *pl, context_t *c); -void newPipeLine(tunnel_t *self, line_t *left_line, uint8_t dest_tid, PipeLineFlowRoutine local_up_stream, - PipeLineFlowRoutine local_down_stream); + +void pipeUpStreamInit(tunnel_t *self, line_t *line); +void pipeUpStreamEst(tunnel_t *self, line_t *line); +void pipeUpStreamFin(tunnel_t *self, line_t *line); +void pipeUpStreamPayload(tunnel_t *self, line_t *line, shift_buffer_t *payload); +void pipeUpStreamPause(tunnel_t *self, line_t *line); +void pipeUpStreamResume(tunnel_t *self, line_t *line); + +void pipeOnUpLinePaused(void *state); +void pipeOnUpLineResumed(void *state); +void pipeOnDownLineResumed(void *state); +void pipeOnDownLinePaused(void *state); +bool pipeSendToUpStream(pipe_line_t *pl, context_t *c); +bool pipeSendToDownStream(pipe_line_t *pl, context_t *c); + +void pipeTo(tunnel_t *t, line_t *l, tid_t tid); +tunnel_t *newPipeTunnel(tunnel_t *t); diff --git a/ww/pipe_tunnel.c b/ww/pipe_tunnel.c new file mode 100644 index 0000000..cecc9fe --- /dev/null +++ b/ww/pipe_tunnel.c @@ -0,0 +1,146 @@ +#include "tunnel.h" +#include "pipe_line.h" + + +static void defaultPipeTunnelUpStreamInit(tunnel_t *self, line_t *line) +{ + assert(self->up != NULL); + if(isUpPiped(line)){ + pipeUpStream(context_t *c) + } + self->up->fnInitU(self->up, line); +} + +static void defaultPipeTunnelUpStreamEst(tunnel_t *self, line_t *line) +{ + assert(self->up != NULL); + self->up->fnEstU(self->up, line); +} + +static void defaultPipeTunnelUpStreamFin(tunnel_t *self, line_t *line) +{ + assert(self->up != NULL); + self->up->fnFinU(self->up, line); +} + +static void defaultPipeTunnelUpStreamPayload(tunnel_t *self, line_t *line, shift_buffer_t *payload) +{ + assert(self->up != NULL); + self->up->fnPayloadU(self->up, line, payload); +} + +static void defaultPipeTunnelUpStreamPause(tunnel_t *self, line_t *line) +{ + assert(self->up != NULL); + self->up->fnPauseU(self->up, line); +} + +static void defaultPipeTunnelUpStreamResume(tunnel_t *self, line_t *line) +{ + assert(self->up != NULL); + self->up->fnResumeU(self->up, line); +} + +static void defaultPipeTunnelUpStreamGetBufInfo(tunnel_t *self, tunnel_buffinfo_t *info) +{ + assert(info->len < kMaxChainLen); + + info->tuns[(info->len)++] = self; + + assert(self->up != NULL); + self->up->fnGBufInfoU(self->up, info); +} + +static void defaultPipeTunneldownStreamInit(tunnel_t *self, line_t *line) +{ + assert(self->dw != NULL); + self->up->fnInitD(self->up, line); +} + +static void defaultPipeTunneldownStreamEst(tunnel_t *self, line_t *line) +{ + assert(self->dw != NULL); + self->up->fnEstD(self->up, line); +} + +static void defaultPipeTunneldownStreamFin(tunnel_t *self, line_t *line) +{ + assert(self->dw != NULL); + self->up->fnFinD(self->up, line); +} + +static void defaultPipeTunneldownStreamPayload(tunnel_t *self, line_t *line, shift_buffer_t *payload) +{ + assert(self->dw != NULL); + self->up->fnPayloadD(self->up, line, payload); +} + +static void defaultPipeTunnelDownStreamPause(tunnel_t *self, line_t *line) +{ + assert(self->dw != NULL); + self->up->fnPauseD(self->up, line); +} + +static void defaultPipeTunnelDownStreamResume(tunnel_t *self, line_t *line) +{ + assert(self->dw != NULL); + self->up->fnResumeD(self->up, line); +} + +static void defaultPipeTunnelDownStreamGetBufInfo(tunnel_t *self, tunnel_buffinfo_t *info) +{ + assert(info->len < kMaxChainLen); + + info->tuns[(info->len)++] = self; + + assert(self->dw != NULL); + self->up->fnGBufInfoD(self->up, info); +} + +static void defaultPipeTunnelOnChainingComplete(tunnel_t *self) +{ + (void) self; +} + +static void defaultPipeTunnelBeforeChainStart(tunnel_t *self) +{ + (void) self; +} + +static void defaultPipeTunnelOnChainStart(tunnel_t *self) +{ + (void) self; +} + +tunnel_t *newTunnel(uint16_t tstate_size, uint16_t cstate_size) +{ + tunnel_t *ptr = globalMalloc(sizeof(tunnel_t) + tstate_size); + + *ptr = (tunnel_t) {.cstate_size = cstate_size, + .fnInitU = &defaultPipeTunnelUpStreamInit, + .fnInitD = &defaultPipeTunneldownStreamInit, + .fnPayloadU = &defaultPipeTunnelUpStreamPayload, + .fnPayloadD = &defaultPipeTunneldownStreamPayload, + .fnEstU = &defaultPipeTunnelUpStreamEst, + .fnEstD = &defaultPipeTunneldownStreamEst, + .fnFinU = &defaultPipeTunnelUpStreamFin, + .fnFinD = &defaultPipeTunneldownStreamFin, + .fnPauseU = &defaultPipeTunnelUpStreamPause, + .fnPauseD = &defaultPipeTunnelDownStreamPause, + .fnResumeU = &defaultPipeTunnelUpStreamResume, + .fnResumeD = &defaultPipeTunnelDownStreamResume, + .fnGBufInfoU = &defaultPipeTunnelUpStreamGetBufInfo, + .fnGBufInfoD = &defaultPipeTunnelDownStreamGetBufInfo, + .onChainingComplete = &defaultPipeTunnelOnChainingComplete, + .beforeChainStart = &defaultPipeTunnelBeforeChainStart, + .onChainStart = &defaultPipeTunnelOnChainStart}; + + + + return ptr; +} + + + + + diff --git a/ww/tunnel.c b/ww/tunnel.c index 0395927..41350d1 100644 --- a/ww/tunnel.c +++ b/ww/tunnel.c @@ -31,50 +31,38 @@ void chain(tunnel_t *from, tunnel_t *to) static void defaultUpStreamInit(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnInitU(self->up, line); - } + assert(self->up != NULL); + self->up->fnInitU(self->up, line); } static void defaultUpStreamEst(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnEstU(self->up, line); - } + assert(self->up != NULL); + self->up->fnEstU(self->up, line); } static void defaultUpStreamFin(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnFinU(self->up, line); - } + assert(self->up != NULL); + self->up->fnFinU(self->up, line); } static void defaultUpStreamPayload(tunnel_t *self, line_t *line, shift_buffer_t *payload) { - if (self->up != NULL) - { - self->up->fnPayloadU(self->up, line, payload); - } + assert(self->up != NULL); + self->up->fnPayloadU(self->up, line, payload); } static void defaultUpStreamPause(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnPauseU(self->up, line); - } + assert(self->up != NULL); + self->up->fnPauseU(self->up, line); } static void defaultUpStreamResume(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnResumeU(self->up, line); - } + assert(self->up != NULL); + self->up->fnResumeU(self->up, line); } static void defaultUpStreamGetBufInfo(tunnel_t *self, tunnel_buffinfo_t *info) @@ -83,58 +71,44 @@ static void defaultUpStreamGetBufInfo(tunnel_t *self, tunnel_buffinfo_t *info) info->tuns[(info->len)++] = self; - if (self->up != NULL) - { - self->up->fnGBufInfoU(self->up, info); - } + assert(self->up != NULL); + self->up->fnGBufInfoU(self->up, info); } static void defaultdownStreamInit(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnInitD(self->up, line); - } + assert(self->dw != NULL); + self->up->fnInitD(self->up, line); } static void defaultdownStreamEst(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnEstD(self->up, line); - } + assert(self->dw != NULL); + self->up->fnEstD(self->up, line); } static void defaultdownStreamFin(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnFinD(self->up, line); - } + assert(self->dw != NULL); + self->up->fnFinD(self->up, line); } static void defaultdownStreamPayload(tunnel_t *self, line_t *line, shift_buffer_t *payload) { - if (self->up != NULL) - { - self->up->fnPayloadD(self->up, line, payload); - } + assert(self->dw != NULL); + self->up->fnPayloadD(self->up, line, payload); } static void defaultDownStreamPause(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnPauseD(self->up, line); - } + assert(self->dw != NULL); + self->up->fnPauseD(self->up, line); } static void defaultDownStreamResume(tunnel_t *self, line_t *line) { - if (self->up != NULL) - { - self->up->fnResumeD(self->up, line); - } + assert(self->dw != NULL); + self->up->fnResumeD(self->up, line); } static void defaultDownStreamGetBufInfo(tunnel_t *self, tunnel_buffinfo_t *info) @@ -143,10 +117,8 @@ static void defaultDownStreamGetBufInfo(tunnel_t *self, tunnel_buffinfo_t *info) info->tuns[(info->len)++] = self; - if (self->up != NULL) - { - self->up->fnGBufInfoD(self->up, info); - } + assert(self->dw != NULL); + self->up->fnGBufInfoD(self->up, info); } static void defaultOnChainingComplete(tunnel_t *self) @@ -164,31 +136,30 @@ static void defaultOnChainStart(tunnel_t *self) (void) self; } -tunnel_t *newTunnel(void) +tunnel_t *newTunnel(uint16_t tstate_size, uint16_t cstate_size) { - tunnel_t *ptr = globalMalloc(sizeof(tunnel_t)); - - tunnel_t tunnel = (tunnel_t) { - - .fnInitU = &defaultUpStreamInit, - .fnInitD = &defaultdownStreamInit, - .fnPayloadU = &defaultUpStreamPayload, - .fnPayloadD = &defaultdownStreamPayload, - .fnEstU = &defaultUpStreamEst, - .fnEstD = &defaultdownStreamEst, - .fnFinU = &defaultUpStreamFin, - .fnFinD = &defaultdownStreamFin, - .fnPauseU = &defaultUpStreamPause, - .fnPauseD = &defaultDownStreamPause, - .fnResumeU = &defaultUpStreamResume, - .fnResumeD = &defaultDownStreamResume, - .fnGBufInfoU = &defaultUpStreamGetBufInfo, - .fnGBufInfoD = &defaultDownStreamGetBufInfo, - .onChainingComplete = &defaultOnChainingComplete, - .beforeChainStart = &defaultBeforeChainStart, - .onChainStart = &defaultOnChainStart}; - - memcpy(ptr, &tunnel, sizeof(tunnel_t)); + tunnel_t *ptr = globalMalloc(sizeof(tunnel_t) + tstate_size); + + *ptr = (tunnel_t) {.cstate_size = cstate_size, + .fnInitU = &defaultUpStreamInit, + .fnInitD = &defaultdownStreamInit, + .fnPayloadU = &defaultUpStreamPayload, + .fnPayloadD = &defaultdownStreamPayload, + .fnEstU = &defaultUpStreamEst, + .fnEstD = &defaultdownStreamEst, + .fnFinU = &defaultUpStreamFin, + .fnFinD = &defaultdownStreamFin, + .fnPauseU = &defaultUpStreamPause, + .fnPauseD = &defaultDownStreamPause, + .fnResumeU = &defaultUpStreamResume, + .fnResumeD = &defaultDownStreamResume, + .fnGBufInfoU = &defaultUpStreamGetBufInfo, + .fnGBufInfoD = &defaultDownStreamGetBufInfo, + .onChainingComplete = &defaultOnChainingComplete, + .beforeChainStart = &defaultBeforeChainStart, + .onChainStart = &defaultOnChainStart}; + + return ptr; } @@ -267,8 +238,8 @@ static void defaultPipeLocalDownStream(struct tunnel_s *self, struct context_s * } } -void pipeTo(tunnel_t *self, line_t *l, tid_t tid) -{ - assert(l->up_state == NULL); - newPipeLine(self, l, tid, defaultPipeLocalUpStream, defaultPipeLocalDownStream); -} +// void pipeTo(tunnel_t *self, line_t *l, tid_t tid) +// { +// assert(l->up_state == NULL); +// newPipeLine(self, l, tid, defaultPipeLocalUpStream, defaultPipeLocalDownStream); +// } diff --git a/ww/tunnel.h b/ww/tunnel.h index 4fe0d7a..d7eb33b 100644 --- a/ww/tunnel.h +++ b/ww/tunnel.h @@ -99,11 +99,13 @@ typedef uint32_t line_refc_t; typedef struct line_s { - line_refc_t refc; - tid_t tid; - bool alive; - bool up_piped; - bool dw_piped; + line_refc_t refc; + tid_t tid; + bool alive; + bool up_piped; + bool dw_piped; + uint8_t auth_cur; + // void *up_state; // void *dw_state; // LineFlowSignal up_pause_cb; @@ -112,8 +114,11 @@ typedef struct line_s // LineFlowSignal dw_resume_cb; socket_context_t src_ctx; socket_context_t dest_ctx; - void *chains_state[kMaxChainLen]; - uint8_t auth_cur; + + struct line_s *up_pipeline; + struct line_s *dw_pipeline; + + uintptr_t *chains_state[] __attribute__((aligned(sizeof(void *)))); } line_t; @@ -185,24 +190,26 @@ typedef struct tunnel_s TunnelStatusCb beforeChainStart; TunnelStatusCb onChainStart; - uint16_t cstate_offset; - uint16_t cstate_size; uint16_t tstate_size; + uint16_t cstate_size; + + uint16_t cstate_offset; uint16_t chain_index; - uint8_t _pad_[4]; - uintptr_t state[]; + uint8_t state[] __attribute__((aligned(sizeof(void *)))); } tunnel_t; -tunnel_t *newTunnel(void); +tunnel_t *newTunnel(uint16_t tstate_size, uint16_t cstate_size); void destroyTunnel(tunnel_t *self); void chain(tunnel_t *from, tunnel_t *to); void chainDown(tunnel_t *from, tunnel_t *to); void chainUp(tunnel_t *from, tunnel_t *to); -void pipeUpStream(context_t *c); -void pipeDownStream(context_t *c); -void pipeTo(tunnel_t *self, line_t *l, tid_t tid); + +static inline void setTunnelState(tunnel_t *self, void *state) +{ + memcpy(&(self->state[0]), state, self->tstate_size); +} // pool handles, instead of malloc / free for the generic pool pool_item_t *allocLinePoolHandle(struct generic_pool_s *pool);