Skip to content

Commit

Permalink
Merge pull request #14 from leonvanbokhorst/nova_kafka_poc
Browse files Browse the repository at this point in the history
Enhance: Parallelize NOVA layer processing
  • Loading branch information
leonvanbokhorst authored Dec 14, 2024
2 parents be45189 + 851c5f8 commit 0da29fb
Showing 1 changed file with 117 additions and 68 deletions.
185 changes: 117 additions & 68 deletions predictive_coding/03_kafka_nova_poc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,59 @@
import time
from typing import Dict, Any
import asyncio
import logging


logger = logging.getLogger(__name__)


class KafkaPublishError(Exception):
"""Raised when there is an error publishing messages to Kafka"""

pass


class NOVALayerError(Exception):
"""Base exception for NOVA layer errors"""

pass


def timed_process(func):
"""Decorator to add timing information to layer processing"""

async def wrapper(self, message: Dict[str, Any], *args, **kwargs) -> Dict[str, Any]:
start_time = time.time()
try:
result = await func(self, message, *args, **kwargs)
end_time = time.time()

# If result is already a dict, update it; otherwise create new dict
if isinstance(result, dict):
result.update(
{
"start_time": start_time,
"end_time": end_time,
"processing_duration": end_time - start_time,
}
)
return result
else:
return {
"result": result,
"start_time": start_time,
"end_time": end_time,
"processing_duration": end_time - start_time,
}
except Exception as e:
logger.error(
"Layer processing failed",
extra={"layer": self.__class__.__name__, "error": str(e)},
exc_info=True,
)
raise NOVALayerError(f"Layer processing failed: {e}") from e

return wrapper


class NOVALayer:
Expand Down Expand Up @@ -101,43 +154,58 @@ def __del__(self):

def publish(self, topic: str, message: Dict[str, Any]):
"""
Publish message to a Kafka topic.
Args:
topic (str): Kafka topic to publish to
message (Dict[str, Any]): Message content in dictionary format
Non-blocking publish to Kafka topic
"""
try:
self.producer.produce(
topic,
json.dumps(message).encode("utf-8"),
callback=self.delivery_report,
)
self.producer.flush()
self.producer.poll(0) # Non-blocking poll for callbacks
except Exception as e:
print(f"Error producing message: {e}")
logger.error(
"Failed to publish message to Kafka",
extra={
"topic": topic,
"error": str(e),
"layer": self.__class__.__name__,
},
exc_info=True,
)
raise KafkaPublishError(f"Failed to publish message to Kafka: {e}") from e

def delivery_report(self, err, msg):
"""Callback for Kafka message delivery confirmation"""
if err is not None:
print(f"Message delivery failed: {err}")
logger.error(
"Message delivery failed",
extra={
"topic": msg.topic(),
"error": str(err),
"layer": self.__class__.__name__,
},
)
# Note: Can't raise here as it's a callback
# Consider implementing a message retry mechanism
else:
print(f"Message delivered to {msg.topic()}")
logger.debug(
"Message delivered successfully",
extra={"topic": msg.topic(), "layer": self.__class__.__name__},
)


class ReactiveLayer(NOVALayer):
"""
Fast response layer (50-300ms)
Fast response layer (50-100ms)
Handles immediate responses with minimal processing.
Similar to gamma wave processing in the brain.
Processing characteristics:
- Fastest response time
- Minimal context consideration
- Basic pattern matching
"""

@timed_process
async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Quick processing of immediate responses
Expand All @@ -148,32 +216,24 @@ async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
Returns:
Dict[str, Any]: Processed response
"""
# Simulate gamma-wave processing time without blocking
await asyncio.sleep(0.1) # 100ms

response = {
await asyncio.sleep(0.05) # 50ms for immediate response
return {
"type": "reactive_response",
"content": f"Quick acknowledgment: {message.get('content', '')}",
"timestamp": time.time(),
}

self.publish("nova.reactive.output", response)
return response


class ResponsiveLayer(NOVALayer):
"""
Context-aware layer (300-1000ms)
Context-aware layer (100-300ms)
Processes information with awareness of immediate context.
Similar to beta wave processing in the brain.
Processing characteristics:
- Medium response time
- Context integration
- Short-term pattern recognition
"""

@timed_process
async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Process with context awareness
Expand All @@ -184,33 +244,25 @@ async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
Returns:
Dict[str, Any]: Context-aware response
"""
# Simulate beta-wave processing time without blocking
await asyncio.sleep(0.3) # 300ms

response = {
await asyncio.sleep(0.2) # 200ms for context processing
return {
"type": "responsive_response",
"content": f"Thoughtful response to: {message.get('content', '')}",
"context": "user_interaction",
"timestamp": time.time(),
}

self.publish("nova.responsive.output", response)
return response


class ReflectiveLayer(NOVALayer):
"""
Learning and adaptation layer (background processing)
Learning and adaptation layer (300-500ms)
Handles pattern learning and long-term adaptation.
Similar to alpha/theta wave processing in the brain.
Processing characteristics:
- Slowest response time
- Deep pattern analysis
- Pattern analysis
- Learning and adaptation
- Long-term memory integration
"""

@timed_process
async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Process for long-term learning and adaptation
Expand All @@ -221,19 +273,13 @@ async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
Returns:
Dict[str, Any]: Learning/adaptation response
"""
# Simulate alpha/theta-wave processing time without blocking
await asyncio.sleep(0.5) # 500ms

response = {
await asyncio.sleep(0.4) # 400ms for learning/adaptation
return {
"type": "reflective_update",
"pattern": "user_interaction_pattern",
"learning": f"Learned from: {message.get('content', '')}",
"timestamp": time.time(),
}

self.publish("nova.reflective.output", response)
return response


class NOVA:
"""
Expand All @@ -248,31 +294,34 @@ def __init__(self, kafka_config: Dict[str, Any]):
self.responsive = ResponsiveLayer(kafka_config)
self.reflective = ReflectiveLayer(kafka_config)

async def process_message(self, message: Dict[str, Any]):
"""
Process message through all layers asynchronously
async def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""Process message through all layers in parallel"""
logger.info("Starting parallel processing", extra={"timestamp": time.time()})

Creates parallel tasks for each layer and waits for all results.
tasks = {
"reactive": self.reactive.process(message),
"responsive": self.responsive.process(message),
"reflective": self.reflective.process(message),
}

Args:
message (Dict[str, Any]): Input message to process
results = {}
for name, task in tasks.items():
try:
results[name] = await task
except Exception as e:
logger.error(f"Error in {name} layer", exc_info=True)
results[name] = None

Returns:
Dict[str, Any]: Combined results from all layers
"""
# Create tasks for each layer - now directly using async methods
reactive_task = asyncio.create_task(self.reactive.process(message))
responsive_task = asyncio.create_task(self.responsive.process(message))
reflective_task = asyncio.create_task(self.reflective.process(message))

# Wait for all tasks to complete
results = await asyncio.gather(reactive_task, responsive_task, reflective_task)
# Flush producers after all processing
try:
for layer in (self.reactive, self.responsive, self.reflective):
layer.producer.flush()
except Exception as e:
logger.error("Failed to flush producers", exc_info=True)
raise KafkaPublishError("Failed to flush Kafka producers") from e

return {
"reactive": results[0],
"responsive": results[1],
"reflective": results[2],
}
logger.info("All processing completed", extra={"timestamp": time.time()})
return results

def close(self):
"""Clean up resources for all layers"""
Expand Down

0 comments on commit 0da29fb

Please sign in to comment.