diff --git a/.gitignore b/.gitignore index 0a1f296..813c451 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ quicktime_video_hack *.wav *.ogg *.mp3 +*.mp4 # Binaries for programs and plugins main *.csv diff --git a/main.go b/main.go index fd46a83..dfa7f7a 100644 --- a/main.go +++ b/main.go @@ -31,7 +31,9 @@ Usage: qvh audio (--mp3 | --ogg | --wav) [--udid=] [-v] qvh gstreamer [--pipeline=] [--examples] [--udid=] [-v] qvh diagnostics [--dump=] [--udid=] + qvh test qvh --version | version + qvh test Options: @@ -88,13 +90,14 @@ The commands work as following: if err != nil { printErrJSON(err, "no device found to use") } - checkDeviceIsPaired(device) + //checkDeviceIsPaired(device) activateCommand, _ := arguments.Bool("activate") if activateCommand { activate(device) return } + audioCommand, _ := arguments.Bool("audio") if audioCommand { outfile, err := arguments.String("") @@ -140,7 +143,11 @@ The commands work as following: runDiagnostics(outfile, dump != "", dump, device) return } - + test, _ := arguments.Bool("test") + if test { + runtest(device) + return + } recordCommand, _ := arguments.Bool("record") if recordCommand { h264FilePath, err := arguments.String("") @@ -171,6 +178,68 @@ The commands work as following: } } +func runtest(device screencapture.IosDevice) { + + /* + gst-launch-1.0 -v videotestsrc ! gdppay ! shmsink socket-path=/tmp/blah shm-size=2000000 + + gst-launch-1.0 shmsrc socket-path=/tmp/blah ! gdpdepay ! queue ! videoconvert ! autovideosink + + curl http://example.com:8000/stream1.ogg ! gst-launch fdsrc fd=0 ! decodebin ! autoaudiosink + + */ + log.Debug("Starting Gstreamer") + gStreamer := gstadapter.New() + startWithConsumer(gStreamer, device, false) +} +func startWithConsumer(consumer screencapture.CmSampleBufConsumer, device screencapture.IosDevice, audioOnly bool) { + var err error + device, err = screencapture.EnableQTConfig(device) + if err != nil { + printErrJSON(err, "Error enabling QT config") + return + } + + adapter := screencapture.UsbAdapterNew{} + stopSignal := make(chan interface{}) + waitForSigInt(stopSignal) + + err = adapter.InitializeUSB(device) + if err != nil { + log.Fatalf("failed initializing usb with error %v for device %v", err, device) + } + + mp := screencapture.NewMessageProcessor(&adapter, stopSignal, consumer, audioOnly) + + //err = adapter.StartReading(device, &mp, stopSignal) + go func() { + for { + frame, err := adapter.ReadFrame() + if err != nil { + return + } + mp.ReceiveData(frame) + } + }() +<-stopSignal + consumer.Stop() + if err != nil { + printErrJSON(err, "failed connecting to usb") + } +} + +func waitForSigInt(stopSignalChannel chan interface{}) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + for sig := range c { + log.Debugf("Signal received: %s", sig) + var stopSignal interface{} + stopSignalChannel <- stopSignal + } + }() +} + //findDevice grabs the first device on the host for a empty --udid //or tries to find the provided device otherwise func findDevice(udid string) (screencapture.IosDevice, error) { @@ -226,7 +295,7 @@ func recordAudioGst(outfile string, device screencapture.IosDevice, audiotype st printErrJSON(err, "Failed creating custom pipeline") return } - startWithConsumer(gStreamer, device, true) + screencapture.StartWithConsumer(gStreamer, device, true) } func runDiagnostics(outfile string, dump bool, dumpFile string, device screencapture.IosDevice) { @@ -238,10 +307,10 @@ func runDiagnostics(outfile string, dump bool, dumpFile string, device screencap defer metricsFile.Close() consumer := diagnostics.NewDiagnosticsConsumer(metricsFile, time.Second*10) if dump { - startWithConsumerDump(consumer, device, dumpFile) + screencapture.StartWithConsumerDump(consumer, device, dumpFile) return } - startWithConsumer(consumer, device, false) + screencapture.StartWithConsumer(consumer, device, false) } func recordAudioWav(outfile string, device screencapture.IosDevice) { @@ -268,7 +337,7 @@ func recordAudioWav(outfile string, device screencapture.IosDevice) { } }() - startWithConsumer(wavFileWriter, device, true) + screencapture.StartWithConsumer(wavFileWriter, device, true) } func startGStreamerWithCustomPipeline(device screencapture.IosDevice, pipelineString string) { @@ -278,13 +347,13 @@ func startGStreamerWithCustomPipeline(device screencapture.IosDevice, pipelineSt printErrJSON(err, "Failed creating custom pipeline") return } - startWithConsumer(gStreamer, device, false) + screencapture.StartWithConsumer(gStreamer, device, false) } func startGStreamer(device screencapture.IosDevice) { log.Debug("Starting Gstreamer") gStreamer := gstadapter.New() - startWithConsumer(gStreamer, device, false) + screencapture.StartWithConsumer(gStreamer, device, false) } // Just dump a list of what was discovered to the console @@ -353,72 +422,7 @@ func record(h264FilePath string, wavFilePath string, device screencapture.IosDev } }() - startWithConsumer(writer, device, false) -} - -func startWithConsumer(consumer screencapture.CmSampleBufConsumer, device screencapture.IosDevice, audioOnly bool) { - var err error - device, err = screencapture.EnableQTConfig(device) - if err != nil { - printErrJSON(err, "Error enabling QT config") - return - } - - adapter := screencapture.UsbAdapter{} - stopSignal := make(chan interface{}) - waitForSigInt(stopSignal) - - mp := screencapture.NewMessageProcessor(&adapter, stopSignal, consumer, audioOnly) - - err = adapter.StartReading(device, &mp, stopSignal) - consumer.Stop() - if err != nil { - printErrJSON(err, "failed connecting to usb") - } -} - -func startWithConsumerDump(consumer screencapture.CmSampleBufConsumer, device screencapture.IosDevice, dumpPath string) { - var err error - device, err = screencapture.EnableQTConfig(device) - if err != nil { - printErrJSON(err, "Error enabling QT config") - return - } - - inboundMessagesFile, err := os.Create("inbound-" + dumpPath) - if err != nil { - log.Fatalf("Could not open file: %v", err) - } - defer inboundMessagesFile.Close() - outboundMessagesFile, err := os.Create("outbound-" + dumpPath) - if err != nil { - log.Fatalf("Could not open file: %v", err) - } - defer outboundMessagesFile.Close() - log.Debug("Start dumping all binary transfer") - adapter := screencapture.UsbAdapter{Dump: true, DumpInWriter: inboundMessagesFile, DumpOutWriter: outboundMessagesFile} - stopSignal := make(chan interface{}) - waitForSigInt(stopSignal) - - mp := screencapture.NewMessageProcessor(&adapter, stopSignal, consumer, false) - - err = adapter.StartReading(device, &mp, stopSignal) - consumer.Stop() - if err != nil { - printErrJSON(err, "failed connecting to usb") - } -} - -func waitForSigInt(stopSignalChannel chan interface{}) { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - go func() { - for sig := range c { - log.Debugf("Signal received: %s", sig) - var stopSignal interface{} - stopSignalChannel <- stopSignal - } - }() + screencapture.StartWithConsumer(writer, device, false) } func checkDeviceIsPaired(device screencapture.IosDevice) { diff --git a/screencapture/gstadapter/gst_adapter.go b/screencapture/gstadapter/gst_adapter.go index 88c71c6..659a6fc 100644 --- a/screencapture/gstadapter/gst_adapter.go +++ b/screencapture/gstadapter/gst_adapter.go @@ -36,8 +36,9 @@ func New() *GstAdapter { audioAppSrc := setUpAudioPipelineBase(pl) setupLivePlayAudio(pl) - pl.SetState(gst.STATE_PLAYING) runGlibMainLoop() + pl.SetState(gst.STATE_PLAYING) + log.Info("Gstreamer is running!") gsta := GstAdapter{videoAppSrc: videoAppSrc, audioAppSrc: audioAppSrc, firstAudioSample: true} diff --git a/screencapture/interfaces.go b/screencapture/interfaces.go index 7e19e3a..4443df3 100644 --- a/screencapture/interfaces.go +++ b/screencapture/interfaces.go @@ -16,5 +16,11 @@ type UsbDataReceiver interface { //UsbWriter can be used to send data to a USB Endpoint type UsbWriter interface { - WriteDataToUsb(data []byte) + WriteDataToUsb(data []byte) error +} + +//UsbWriter can be used to send data to a USB Endpoint +type UsbWriterNew interface { + WriteDataToUsb(data []byte) error + ReadFrame() ([]byte, error) } diff --git a/screencapture/messageprocessor.go b/screencapture/messageprocessor.go index 47a60c0..aefd110 100644 --- a/screencapture/messageprocessor.go +++ b/screencapture/messageprocessor.go @@ -83,7 +83,7 @@ func (mp *MessageProcessor) handleSyncPacket(data []byte) { } log.Debugf("Rcv:%s", ogPacket.String()) - replyBytes := ogPacket.NewReply() + replyBytes := ogPacket.NewReply(0) log.Debugf("Send OG-REPLY {correlation:%x}", ogPacket.CorrelationID) mp.usbWriter.WriteDataToUsb(replyBytes) case packet.CWPA: diff --git a/screencapture/messageprocessor_test.go b/screencapture/messageprocessor_test.go index 48339d4..1ffbe4f 100644 --- a/screencapture/messageprocessor_test.go +++ b/screencapture/messageprocessor_test.go @@ -83,9 +83,9 @@ func TestMessageProcessorRespondsCorrectlyToSyncMessages(t *testing.T) { description: "Expect correct reply for cwpa", }, { - receivedData: loadFromFile("og-request")[4:], - expectedReply: [][]byte{loadFromFile("og-reply")}, - description: "Expect correct reply for og", + receivedData: loadFromFile("gocmd-request")[4:], + expectedReply: [][]byte{loadFromFile("gocmd-reply")}, + description: "Expect correct reply for gocmd", }, { receivedData: loadFromFile("stop-request")[4:], diff --git a/screencapture/packet/sync_og.go b/screencapture/packet/sync_og.go index 886c2d7..de45ec4 100644 --- a/screencapture/packet/sync_og.go +++ b/screencapture/packet/sync_og.go @@ -25,12 +25,12 @@ func NewSyncOgPacketFromBytes(data []byte) (SyncOgPacket, error) { } //NewReply returns a []byte containing the default reply for a SyncOgPacket -func (sp SyncOgPacket) NewReply() []byte { +func (sp SyncOgPacket) NewReply(response uint64) []byte { responseBytes := make([]byte, 24) binary.LittleEndian.PutUint32(responseBytes, 24) binary.LittleEndian.PutUint32(responseBytes[4:], ReplyPacketMagic) binary.LittleEndian.PutUint64(responseBytes[8:], sp.CorrelationID) - binary.LittleEndian.PutUint64(responseBytes[16:], 0) + binary.LittleEndian.PutUint64(responseBytes[16:], response) return responseBytes diff --git a/screencapture/usbadapter-new.go b/screencapture/usbadapter-new.go new file mode 100644 index 0000000..3a656ce --- /dev/null +++ b/screencapture/usbadapter-new.go @@ -0,0 +1,163 @@ +package screencapture + +import ( + "encoding/binary" + "fmt" + "io" + + "github.com/pkg/errors" + + "github.com/google/gousb" + log "github.com/sirupsen/logrus" +) + +//UsbAdapterNew reads and writes from AV Quicktime USB Bulk endpoints +type UsbAdapterNew struct { + outEndpoint *gousb.OutEndpoint + Dump bool + DumpOutWriter io.Writer + DumpInWriter io.Writer + stream *gousb.ReadStream + usbDevice *gousb.Device + contextClose func() + iface *gousb.Interface + iosDevice IosDevice +} + +//WriteDataToUsb implements the UsbWriter interface and sends the byte array to the usb bulk endpoint. +func (usbAdapter *UsbAdapterNew) WriteDataToUsb(bytes []byte) error { + _, err := usbAdapter.outEndpoint.Write(bytes) + if err != nil { + return err + } + if usbAdapter.Dump { + _, err := usbAdapter.DumpOutWriter.Write(bytes) + if err != nil { + log.Fatalf("Failed dumping data:%v", err) + } + } + return nil +} + +func (usbAdapter *UsbAdapterNew) InitializeUSB(device IosDevice) error { + ctx, cleanUp := createContext() + usbAdapter.contextClose = cleanUp + usbAdapter.outEndpoint = &gousb.OutEndpoint{} + usbAdapter.stream = &gousb.ReadStream{} + usbAdapter.usbDevice = &gousb.Device{} + usbAdapter.iface = &gousb.Interface{} + usbAdapter.iosDevice = device + + usbDevice, err := OpenDevice(ctx, device) + if err != nil { + return err + } + if !device.IsActivated() { + return errors.New("device not activated for screen mirroring") + } + + confignum, _ := usbDevice.ActiveConfigNum() + log.Debugf("Config is active: %d, QT config is: %d", confignum, device.QTConfigIndex) + + config, err := usbDevice.Config(device.QTConfigIndex) + if err != nil { + return errors.New("Could not retrieve config") + } + + log.Debugf("QT Config is active: %s", config.String()) + + iface, err := findAndClaimQuickTimeInterface(config) + if err != nil { + log.Debug("could not get Quicktime Interface") + return err + } + log.Debugf("Got QT iface:%s", iface.String()) + + inboundBulkEndpointIndex, inboundBulkEndpointAddress, err := findBulkEndpoint(iface.Setting, gousb.EndpointDirectionIn) + if err != nil { + return err + } + + outboundBulkEndpointIndex, outboundBulkEndpointAddress, err := findBulkEndpoint(iface.Setting, gousb.EndpointDirectionOut) + if err != nil { + return err + } + + err = clearFeature(usbDevice, inboundBulkEndpointAddress, outboundBulkEndpointAddress) + if err != nil { + return err + } + + inEndpoint, err := iface.InEndpoint(inboundBulkEndpointIndex) + if err != nil { + log.Error("couldnt get InEndpoint") + return err + } + log.Debugf("Inbound Bulk: %s", inEndpoint.String()) + + outEndpoint, err := iface.OutEndpoint(outboundBulkEndpointIndex) + if err != nil { + log.Error("couldnt get OutEndpoint") + return err + } + + stream, err := inEndpoint.NewStream(4096, 5) + if err != nil { + log.Fatal("couldnt create stream") + return err + } + log.Debug("Endpoint claimed") + log.Debugf("Outbound Bulk: %s", outEndpoint.String()) + usbAdapter.outEndpoint = outEndpoint + usbAdapter.stream = stream + usbAdapter.usbDevice = usbDevice + usbAdapter.iface = iface + + return nil +} + +func (usbAdapter *UsbAdapterNew) Close() error { + log.Info("Closing usb stream") + + err := usbAdapter.stream.Close() + if err != nil { + log.Error("Error closing stream", err) + } + log.Info("Closing usb interface") + usbAdapter.iface.Close() + + sendQTDisableConfigControlRequest(usbAdapter.usbDevice) + log.Debug("Resetting device config") + _, err = usbAdapter.usbDevice.Config(usbAdapter.iosDevice.UsbMuxConfigIndex) + if err != nil { + log.Warn(err) + } + usbAdapter.contextClose() + return nil +} + +func (usbAdapter *UsbAdapterNew) ReadFrame() ([]byte, error) { + lengthBuffer := make([]byte, 4) + for { + n, err := io.ReadFull(usbAdapter.stream, lengthBuffer) + if err != nil { + return []byte{}, fmt.Errorf("failed reading 4bytes length with err:%s only received: %d", err, n) + } + //the 4 bytes header are included in the length, so we need to subtract them + //here to know how long the payload will be + length := binary.LittleEndian.Uint32(lengthBuffer) - 4 + dataBuffer := make([]byte, length) + + n, err = io.ReadFull(usbAdapter.stream, dataBuffer) + if err != nil { + return []byte{}, err + } + if usbAdapter.Dump { + _, err := usbAdapter.DumpInWriter.Write(dataBuffer) + if err != nil { + log.Fatalf("Failed dumping data:%v", err) + } + } + return dataBuffer, nil + } +} diff --git a/screencapture/utils.go b/screencapture/utils.go new file mode 100644 index 0000000..cb5adc1 --- /dev/null +++ b/screencapture/utils.go @@ -0,0 +1,130 @@ +package screencapture + +import ( + log "github.com/sirupsen/logrus" + "os" + "os/signal" +) + +func StartWithConsumer(consumer CmSampleBufConsumer, device IosDevice, audioOnly bool) { + var err error + device, err = EnableQTConfig(device) + if err != nil { + log.Fatalf("error enabling QT config %v for device %v", err, device) + } + + usbAdapter := &UsbAdapterNew{} + err = usbAdapter.InitializeUSB(device) + if err != nil { + log.Fatalf("failed initializing usb with error %v for device %v", err, device) + } + + valeriaInterface := NewValeriaInterface(usbAdapter) + defer CloseAll(usbAdapter, valeriaInterface) + go func() { + err := valeriaInterface.StartReadLoop() + log.Info("Valeria read loop stopped.") + if err != nil { + log.Errorf("Valeria read loop stopped with error %v", err) + } + }() + setupSession(valeriaInterface) + + go func() { + for { + buf := valeriaInterface.Local.ReadSampleBuffer() + err := valeriaInterface.Remote.RequestSampleData() + if err != nil { + log.Debug("failed sending need") + return + } + err = consumer.Consume(buf) + if err != nil { + log.Warnf("consumer %v failed to consume buffer %v with error %v", consumer, buf, err) + } + } + }() + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} +func StartWithConsumerDump(consumer CmSampleBufConsumer, device IosDevice, dumpPath string){} + +func setupSession(valeriaInterface ValeriaInterface) { + err := valeriaInterface.Local.AwaitPing() + if err != nil { + log.Errorf("ping timed out failed %v", err) + return + } + + log.Info("ping received, responding..") + err = valeriaInterface.Remote.Ping() + if err != nil { + log.Errorf("failed sending Ping %v", err) + return + } + + log.Info("handshake complete, awaiting audio clock sync") + err = valeriaInterface.Local.AwaitAudioClockSync() + if err != nil { + log.Errorf("audio clock sync failed %v", err) + return + } + + log.Info("audio clock sync ok, enabling video") + err = valeriaInterface.Remote.EnableVideo() + if err != nil { + log.Errorf("failed enabling video %v", err) + return + } + + log.Infof("enabling audio") + err = valeriaInterface.Remote.EnableAudio() + if err != nil { + log.Errorf("failed enabling audio %v", err) + return + } + + log.Info("awaiting video clock sync") + err = valeriaInterface.Local.AwaitVideoClockSync() + if err != nil { + log.Errorf("failed waiting for video clock sync %v", err) + return + } + + log.Info("sending initial sample data request") + err = valeriaInterface.Remote.RequestSampleData() + if err != nil { + log.Errorf("failed requesting sample data %v", err) + return + } +} + +func CloseAll(usbAdapter *UsbAdapterNew, valeriaInterface ValeriaInterface) { + log.Info("stopping audio") + err := valeriaInterface.Remote.StopAudio() + if err != nil { + log.Errorf("error stopping audio", err) + } + + log.Info("stopping video") + err = valeriaInterface.Remote.StopVideo() + if err != nil { + log.Errorf("error stopping video", err) + } + log.Info("awaiting audio release") + err = valeriaInterface.Local.AwaitAudioClockRelease() + if err != nil { + log.Errorf("error waiting audio clock release", err) + } + + log.Info("awaiting video release") + err = valeriaInterface.Local.AwaitVideoClockRelease() + if err != nil { + log.Errorf("error waiting video clock release", err) + } + + log.Info("shutting down usbadapter") + err = usbAdapter.Close() + log.Info("Stream closed successfullly, good bye :-)") +} diff --git a/screencapture/valeriainterface.go b/screencapture/valeriainterface.go new file mode 100644 index 0000000..948ac7b --- /dev/null +++ b/screencapture/valeriainterface.go @@ -0,0 +1,456 @@ +package screencapture + +import ( + "encoding/binary" + "fmt" + "github.com/danielpaulus/quicktime_video_hack/screencapture/coremedia" + "github.com/danielpaulus/quicktime_video_hack/screencapture/packet" + log "github.com/sirupsen/logrus" + "time" +) + +type ValeriaInterface struct { + Local LocalValeriaApi + Remote DeviceValeriaAPI + errorChannel chan error + closeChannel chan interface{} +} + +type DataHolder struct { + localAudioClock coremedia.CMClock + deviceAudioClockRef packet.CFTypeID + remoteVideoClockRef packet.CFTypeID + clock coremedia.CMClock + startTimeLocalAudioClock coremedia.CMTime + lastEatFrameReceivedLocalAudioClockTime coremedia.CMTime + startTimeDeviceAudioClock coremedia.CMTime + lastEatFrameReceivedDeviceAudioClockTime coremedia.CMTime + audioSamplesReceived uint64 + firstAudioTimeTaken bool + videoSamplesReceived uint64 + localVideoClockRef packet.CFTypeID +} + +type LocalValeriaApi struct { + remote DeviceValeriaAPI + pingChannel chan error + audioClockChannel chan error + dataHolder DataHolder + videoClockChannel chan error + sampleDataChannel chan coremedia.CMSampleBuffer + audioReleaseChannel chan error + videoReleaseChannel chan error +} + +type DeviceValeriaAPI struct { + usbAdapter UsbWriterNew + dataHolder DataHolder +} + +func NewValeriaInterface(usbAdapter UsbWriterNew) ValeriaInterface { + dataHolder := DataHolder{} + local := LocalValeriaApi{ + pingChannel: make(chan error, 1), + audioClockChannel: make(chan error, 1), + videoClockChannel: make(chan error, 1), + audioReleaseChannel: make(chan error, 1), + videoReleaseChannel: make(chan error, 1), + dataHolder: dataHolder, + sampleDataChannel: make(chan coremedia.CMSampleBuffer, 50), + } + remote := DeviceValeriaAPI{dataHolder: dataHolder, usbAdapter: usbAdapter} + valeriaIface := ValeriaInterface{Local: local, + errorChannel: make(chan error, 1), + closeChannel: make(chan interface{}), + Remote: remote, + } + return valeriaIface +} + +// StartReadLoop claims&opens the USB Device and starts listening to RPC calls +// and blocks until ValeriaInterface is closed or an error occurs. +func (v *ValeriaInterface) StartReadLoop() error { + return readLoop(v) +} + +func (d DeviceValeriaAPI) StopAudio() error { + return d.usbAdapter.WriteDataToUsb(packet.NewAsynHPA0(d.dataHolder.deviceAudioClockRef)) +} + +func (d DeviceValeriaAPI) StopVideo() error { + return d.usbAdapter.WriteDataToUsb(packet.NewAsynHPD0()) +} + +func (l LocalValeriaApi) AwaitAudioClockRelease() error { + return awaitOrTimeout(l.audioReleaseChannel, "audio clock release") +} + +func (l LocalValeriaApi) AwaitVideoClockRelease() error { + return awaitOrTimeout(l.videoReleaseChannel, "video clock release") +} + +func (l LocalValeriaApi) AwaitVideoClockSync() error { + return awaitOrTimeout(l.videoClockChannel, "video clock") +} + +func (l LocalValeriaApi) AwaitAudioClockSync() error { + return awaitOrTimeout(l.audioClockChannel, "audio clock") +} + +func (l LocalValeriaApi) AwaitPing() error { + return awaitOrTimeout(l.pingChannel, "waiting for ping") +} + +func awaitOrTimeout(channel chan error, loggerTag string) error { + select { + case <-time.After(5 * time.Second): + return fmt.Errorf("%s timed out. restart the device please it might be buggy", loggerTag) + case err := <-channel: + if err != nil { + return fmt.Errorf("failed '%s' with device %v", loggerTag, err) + } + } + log.Infof("%s succeeded", loggerTag) + return nil +} + +func (l LocalValeriaApi) ping() { + l.pingChannel <- nil +} + +//I don't know what the !go command is for. It seems we always just return 0 and it works. +func (l LocalValeriaApi) gocmd(unknown uint32) uint64 { + log.Debugf("go! %d", unknown) + return 0 +} + +func (l *LocalValeriaApi) setupAudioClock(deviceClockRef packet.CFTypeID) packet.CFTypeID { + clockRef := deviceClockRef + 1000 + + l.dataHolder.localAudioClock = coremedia.NewCMClockWithHostTime(clockRef) + l.dataHolder.deviceAudioClockRef = deviceClockRef + l.audioClockChannel <- nil + return clockRef +} + +//this message is always the same, so we just prepare it once and send the same bytes all the time +var needMessage []byte + +func (l *LocalValeriaApi) setupVideoClock(deviceClockRef packet.CFTypeID) packet.CFTypeID { + l.dataHolder.remoteVideoClockRef = deviceClockRef + l.dataHolder.localVideoClockRef = deviceClockRef + 0x1000AF + needMessage = packet.AsynNeedPacketBytes(deviceClockRef) + l.videoClockChannel <- nil + return l.dataHolder.localVideoClockRef +} + +func (l *LocalValeriaApi) setupMainClock(ref packet.CFTypeID) packet.CFTypeID { + clockRef := ref + 0x10000 + l.dataHolder.clock = coremedia.NewCMClockWithHostTime(clockRef) + return clockRef +} + +func (l LocalValeriaApi) time() coremedia.CMTime { + return l.dataHolder.clock.GetTime() +} + +func (l LocalValeriaApi) stop() { + log.Info("device sent STOP command") +} + +func (l LocalValeriaApi) skew() float64 { + return coremedia.CalculateSkew( + l.dataHolder.startTimeLocalAudioClock, + l.dataHolder.lastEatFrameReceivedLocalAudioClockTime, + l.dataHolder.startTimeDeviceAudioClock, + l.dataHolder.lastEatFrameReceivedDeviceAudioClockTime) +} + +//TODO: make this closeable to prevent goroutine leaks +//ReadSampleBuffer blocks until a buffer is received or the interface is closed +func (l LocalValeriaApi) ReadSampleBuffer() coremedia.CMSampleBuffer { + return <-l.sampleDataChannel +} + +func (l LocalValeriaApi) receiveAudioSample(buf coremedia.CMSampleBuffer) { + if !l.dataHolder.firstAudioTimeTaken { + l.dataHolder.startTimeDeviceAudioClock = buf.OutputPresentationTimestamp + l.dataHolder.startTimeLocalAudioClock = l.dataHolder.localAudioClock.GetTime() + l.dataHolder.lastEatFrameReceivedDeviceAudioClockTime = buf.OutputPresentationTimestamp + l.dataHolder.lastEatFrameReceivedLocalAudioClockTime = l.dataHolder.startTimeLocalAudioClock + l.dataHolder.firstAudioTimeTaken = true + } else { + l.dataHolder.lastEatFrameReceivedDeviceAudioClockTime = buf.OutputPresentationTimestamp + l.dataHolder.lastEatFrameReceivedLocalAudioClockTime = l.dataHolder.localAudioClock.GetTime() + } + + l.sampleDataChannel <- buf + if log.IsLevelEnabled(log.DebugLevel) { + l.dataHolder.audioSamplesReceived++ + if l.dataHolder.audioSamplesReceived%100 == 0 { + log.Debugf("RCV Audio Samples:%d", l.dataHolder.audioSamplesReceived) + } + } +} + +func (l LocalValeriaApi) feed(buf coremedia.CMSampleBuffer) { + l.sampleDataChannel <- buf + if log.IsLevelEnabled(log.DebugLevel) { + l.dataHolder.videoSamplesReceived++ + if l.dataHolder.videoSamplesReceived%500 == 0 { + log.Debugf("Rcv'd(%d) last:%s", l.dataHolder.videoSamplesReceived, buf.String()) + l.dataHolder.videoSamplesReceived = 0 + } + } +} + +func (l LocalValeriaApi) timeJump(unknown []byte) { + +} + +func (l LocalValeriaApi) setProperties(property coremedia.StringKeyEntry, clockRef packet.CFTypeID) { + +} + +func (l LocalValeriaApi) setClockRate(clockRef packet.CFTypeID, cmTime coremedia.CMTime, rate1 float32, rate2 float32) { + +} + +func (l LocalValeriaApi) setTimeBase(ref packet.CFTypeID, ref2 packet.CFTypeID) { + +} + +func (l LocalValeriaApi) release(clockRef packet.CFTypeID) { + if clockRef == l.dataHolder.localVideoClockRef { + l.videoReleaseChannel <- nil + return + } + if clockRef == l.dataHolder.clock.ID { + l.audioReleaseChannel <- nil + return + } + log.Warnf( + ` + release for unknown clock received %x -- localaudio:%x remoteaudio:%x + remotevideo:%x localvideo:%x mainclock:%x`, + clockRef, l.dataHolder.localAudioClock.ID, l.dataHolder.deviceAudioClockRef, + l.dataHolder.remoteVideoClockRef, l.dataHolder.localVideoClockRef, l.dataHolder.clock.ID) +} + +func (d DeviceValeriaAPI) RequestSampleData() error { + log.Debugf("Send NEED %x", d.dataHolder.remoteVideoClockRef) + return d.usbAdapter.WriteDataToUsb(needMessage) +} + +func (d DeviceValeriaAPI) EnableVideo() error { + deviceInfo := packet.NewAsynHpd1Packet(packet.CreateHpd1DeviceInfoDict()) + log.Debug("Sending ASYN HPD1") + err := d.usbAdapter.WriteDataToUsb(deviceInfo) + if err != nil { + return err + } + log.Debug("Sending ASYN HPD1") + return d.usbAdapter.WriteDataToUsb(deviceInfo) +} + +func (d DeviceValeriaAPI) EnableAudio() error { + + deviceInfo1 := packet.NewAsynHpa1Packet(packet.CreateHpa1DeviceInfoDict(), d.dataHolder.deviceAudioClockRef) + log.Debug("Sending ASYN HPA1") + return d.usbAdapter.WriteDataToUsb(deviceInfo1) +} + +func (d DeviceValeriaAPI) Ping() error { + return d.usbAdapter.WriteDataToUsb(packet.NewPingPacketAsBytes()) +} + +//readLoop reads messages sent by the device and dispatches them to the local api +func readLoop(v *ValeriaInterface) error { + for { + select { + case err := <-v.errorChannel: + return err + case <-v.closeChannel: + return nil + default: + frame, err := v.Remote.usbAdapter.ReadFrame() + if err != nil { + return err + } + handleFrame(frame, v) + } + + } +} + +// Decode Remote rpc calls and forward them to the local Valeria API +func handleFrame(data []byte, valeria *ValeriaInterface) { + switch binary.LittleEndian.Uint32(data) { + case packet.PingPacketMagic: + valeria.Local.ping() + case packet.SyncPacketMagic: + err := handleSyncPacket(data, valeria) + if err != nil { + valeria.errorChannel <- err + return + } + return + case packet.AsynPacketMagic: + err := handleAsyncPacket(data, valeria) + if err != nil { + valeria.errorChannel <- err + return + } + return + default: + valeria.errorChannel <- fmt.Errorf("received unknown packet type: %x", data[:4]) + } +} + +func handleSyncPacket(data []byte, valeria *ValeriaInterface) error { + switch binary.LittleEndian.Uint32(data[12:]) { + case packet.OG: + ogPacket, err := packet.NewSyncOgPacketFromBytes(data) + if err != nil { + log.Error("Error parsing OG packet", err) + } + log.Debugf("Rcv:%s", ogPacket.String()) + response := valeria.Local.gocmd(ogPacket.Unknown) + replyBytes := ogPacket.NewReply(response) + return valeria.Remote.usbAdapter.WriteDataToUsb(replyBytes) + case packet.CWPA: + cwpaPacket, err := packet.NewSyncCwpaPacketFromBytes(data) + if err != nil { + return fmt.Errorf("failed parsing cwpa packet %v", err) + } + log.Debugf("Rcv:%s", cwpaPacket.String()) + clockRef := valeria.Local.setupAudioClock(cwpaPacket.DeviceClockRef) + + log.Debugf("Send CWPA-RPLY {correlation:%x, clockRef:%x}", cwpaPacket.CorrelationID, clockRef) + return valeria.Remote.usbAdapter.WriteDataToUsb(cwpaPacket.NewReply(clockRef)) + + case packet.CVRP: + cvrpPacket, err := packet.NewSyncCvrpPacketFromBytes(data) + if err != nil { + return fmt.Errorf("error parsing CVRP packet %v", err) + } + log.Debugf("Rcv:%s", cvrpPacket.String()) + videoClockRef := valeria.Local.setupVideoClock(cvrpPacket.DeviceClockRef) + + log.Debugf("Send CVRP-RPLY {correlation:%x, clockRef:%x}", cvrpPacket.CorrelationID, videoClockRef) + return valeria.Remote.usbAdapter.WriteDataToUsb(cvrpPacket.NewReply(videoClockRef)) + case packet.CLOK: + clokPacket, err := packet.NewSyncClokPacketFromBytes(data) + if err != nil { + log.Error("Failed parsing Clok Packet", err) + } + log.Debugf("Rcv:%s", clokPacket.String()) + clockRef := valeria.Local.setupMainClock(clokPacket.ClockRef) + + log.Debugf("Send CLOK-RPLY {correlation:%x, clockRef:%x}", clokPacket.CorrelationID, clockRef) + return valeria.Remote.usbAdapter.WriteDataToUsb(clokPacket.NewReply(clockRef)) + case packet.TIME: + timePacket, err := packet.NewSyncTimePacketFromBytes(data) + if err != nil { + log.Error("Error parsing TIME SYNC packet", err) + } + log.Debugf("Rcv:%s", timePacket.String()) + timeToSend := valeria.Local.time() + replyBytes, err := timePacket.NewReply(timeToSend) + if err != nil { + return fmt.Errorf("could not create SYNC TIME REPLY") + } + log.Debugf("Send TIME-REPLY {correlation:%x, time:%s}", timePacket.CorrelationID, timeToSend) + return valeria.Remote.usbAdapter.WriteDataToUsb(replyBytes) + //TODO: turn into nice API function + case packet.AFMT: + afmtPacket, err := packet.NewSyncAfmtPacketFromBytes(data) + if err != nil { + log.Error("Error parsing SYNC AFMT packet", err) + } + log.Debugf("Rcv:%s", afmtPacket.String()) + + replyBytes := afmtPacket.NewReply() + log.Debugf("Send AFMT-REPLY {correlation:%x}", afmtPacket.CorrelationID) + return valeria.Remote.usbAdapter.WriteDataToUsb(replyBytes) + case packet.SKEW: + skewPacket, err := packet.NewSyncSkewPacketFromBytes(data) + if err != nil { + log.Error("Error parsing SYNC SKEW packet", err) + } + skewValue := valeria.Local.skew() + log.Debugf("Rcv:%s Reply:%f", skewPacket.String(), skewValue) + return valeria.Remote.usbAdapter.WriteDataToUsb(skewPacket.NewReply(skewValue)) + case packet.STOP: + stopPacket, err := packet.NewSyncStopPacketFromBytes(data) + if err != nil { + log.Error("Error parsing SYNC STOP packet", err) + } + valeria.Local.stop() + log.Debugf("Rcv:%s", stopPacket.String()) + return valeria.Remote.usbAdapter.WriteDataToUsb(stopPacket.NewReply()) + default: + return fmt.Errorf("received unknown sync packet type: %x", data) + } +} + +func handleAsyncPacket(data []byte, valeria *ValeriaInterface) error { + switch binary.LittleEndian.Uint32(data[12:]) { + case packet.EAT: + eatPacket, err := packet.NewAsynCmSampleBufPacketFromBytes(data) + if err != nil { + return fmt.Errorf("eat packet could not be unmarshalled %v", err) + } + valeria.Local.receiveAudioSample(eatPacket.CMSampleBuf) + return nil + case packet.FEED: + feedPacket, err := packet.NewAsynCmSampleBufPacketFromBytes(data) + if err != nil { + return fmt.Errorf("error parsing FEED packet: %x %s", data, err) + } + valeria.Local.feed(feedPacket.CMSampleBuf) + return nil + case packet.SPRP: + sprpPacket, err := packet.NewAsynSprpPacketFromBytes(data) + if err != nil { + return fmt.Errorf("error parsing SPRP packet %v", err) + } + valeria.Local.setProperties(sprpPacket.Property, sprpPacket.ClockRef) + log.Debugf("Rcv:%s", sprpPacket.String()) + return nil + case packet.TJMP: + tjmpPacket, err := packet.NewAsynTjmpPacketFromBytes(data) + if err != nil { + return fmt.Errorf("error parsing tjmp packet %v", err) + } + valeria.Local.timeJump(tjmpPacket.Unknown) + log.Debugf("Rcv:%s", tjmpPacket.String()) + return nil + case packet.SRAT: + sratPacket, err := packet.NewAsynSratPacketFromBytes(data) + if err != nil { + return fmt.Errorf("error parsing srat packet %v", err) + } + valeria.Local.setClockRate(sratPacket.ClockRef, sratPacket.Time, sratPacket.Rate1, sratPacket.Rate2) + log.Debugf("Rcv:%s", sratPacket.String()) + return nil + case packet.TBAS: + tbasPacket, err := packet.NewAsynTbasPacketFromBytes(data) + if err != nil { + return fmt.Errorf("error parsing tbas packet %v", err) + } + valeria.Local.setTimeBase(tbasPacket.ClockRef, tbasPacket.SomeOtherRef) + log.Debugf("Rcv:%s", tbasPacket.String()) + return nil + case packet.RELS: + relsPacket, err := packet.NewAsynRelsPacketFromBytes(data) + if err != nil { + return fmt.Errorf("error parsing RELS packet %v", err) + } + valeria.Local.release(relsPacket.ClockRef) + log.Debugf("Rcv:%s", relsPacket.String()) + return nil + default: + return fmt.Errorf("received unknown async packet type: %x", data) + } +}