/* * libiec61883 - Linux IEEE 1394 streaming media library. * Copyright (C) 2004 Kristian Hogsberg, Dan Dennedy, and Dan Maas. * This file orginially written in C++ by Dan Maas; ported to C by Dan Dennedy. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include "deque.h" #include "tsbuffer.h" // max # of packets to look ahead for PCRs // reasonable values: 1000 - 10000 #define MAX_PCR_LOOKAHEAD 20000 // # of PCRs to average over when estimating bitrate // reasonable values: 1-100 #define PCR_SMOOTH_INTERVAL 5 // approximate # of ISO cycles transmission delay // valid range is 0-10; good values are 5-15 #define SYT_OFFSET 7 // leave this off for now; I don't think it's needed with the new algorithm #define ENABLE_PCR_DRIFT_CORRECTION 0 // # of seconds between PCR bitrate drift checks // reasonable values: 1-5 #define PCR_DRIFT_INTERVAL 1 typedef unsigned char u8; typedef unsigned int u32; typedef unsigned long long u64; typedef signed long long s64; /* 8-byte CIP (Common Isochronous Packet) header */ struct CIP_header { u8 b[ 8 ]; }; static void fill_mpeg_cip_header( struct CIP_header *cip, u8 src_node_id, u8 counter ) { cip->b[ 0 ] = src_node_id; cip->b[ 1 ] = 0x06; cip->b[ 2 ] = 0xC4; cip->b[ 3 ] = counter; cip->b[ 4 ] = 0xA0; cip->b[ 5 ] = 0x80; // "time-shifted" flag; also try 0x00 cip->b[ 6 ] = 0x00; cip->b[ 7 ] = 0x00; } /* MPEG-2 TS timestamp */ typedef u32 sph_t; static inline u32 mpeg_ts_offset( sph_t t ) { return t & 0xfff; } static inline u32 mpeg_ts_count( sph_t t ) { return ( t >> 12 ) & 0x1fff; } static inline sph_t make_sph( u32 count, u32 offset ) { return ( ( count & 0x1fff ) << 12 ) | ( offset & 0xfff ); } /* MPEG-2 TSP packet (timestamp plus TS packet) */ struct TSP_packet { sph_t sph; unsigned char data[ 188 ]; }; // one ISO cycle struct buf_cycle { struct CIP_header header; struct TSP_packet packet[ 3 ]; // up to three TSP packets }; // 188-byte MPEG-2 TS packet struct mpeg2_ts { u8 ts_header[ 4 ]; u8 adapt_len; u8 adapt_flags; u8 pcr[ 6 ]; u8 data[ 176 ]; }; static int ts_get_pid( struct mpeg2_ts *ts ) { return ( ( ts->ts_header[ 1 ] << 8 ) + ts->ts_header[ 2 ] ) & 0x1fff; } // returns the PCR clock, in units of 1 / 27MHz static u64 ts_get_pcr( struct mpeg2_ts *ts ) { u64 pcr; pcr = ts->pcr[ 0 ] << 25; pcr += ts->pcr[ 1 ] << 17; pcr += ts->pcr[ 2 ] << 9; pcr += ts->pcr[ 3 ] << 1; pcr += ( ts->pcr[ 4 ] & 0x80 ) >> 7; pcr *= 300; pcr += ( ts->pcr[ 4 ] & 0x1 ) << 8; pcr += ts->pcr[ 5 ]; return pcr; } static int ts_has_pcr( struct mpeg2_ts *ts, int pid ) { if ( ( pid == -1 ) || ( pid == ts_get_pid( ts ) ) ) { if ( ( ts->ts_header[ 3 ] & 0x20 ) // adaptation field present && ( ts->adapt_len > 0 ) && ( ts->adapt_flags & 0x10 ) ) { return 1; } } return 0; } /** tsbuffer ******************************************************************/ struct tsbuffer { // queue of TS packets waiting to be sent iec61883_deque_t ts_queue; iec61883_mpeg2_xmit_t read_packet; void *callback_data; unsigned int dropped; // PCR state machine u64 last_pcr; // last PCR seen u64 pcr_drift_ref; // PCR from which to base drift calculations u32 pcr_drift_cycles; // how many cycles have passed since pcr_drift_ref was set u32 packets_since_last_pcr; u64 delta_pcr_per_packet; // packetization state machine // num/denom algorithm for determining # of TS packets to send out in each ISO cycle u64 tsp_accum; u64 tsp_whole; u64 tsp_num; u64 tsp_denom; int selected_pid; // ISO continuity counter u32 iso_counter; }; tsbuffer_t tsbuffer_init (iec61883_mpeg2_xmit_t read_cb, void *callback_data, int pid) { tsbuffer_t this = (tsbuffer_t) calloc (1, sizeof (struct tsbuffer)); if (this) { // initialize members this->last_pcr = 0; this->pcr_drift_ref = 0; this->pcr_drift_cycles = 0; this->tsp_accum = 0; this->iso_counter = 0; this->selected_pid = pid; this->ts_queue = iec61883_deque_init(); this->read_packet = read_cb; this->callback_data = callback_data; this->dropped = 0; // skip ahead to the first PCR tsbuffer_read_to_next_pcr (this); this->last_pcr = ts_get_pcr (iec61883_deque_back (this->ts_queue)); // dump the useless packets that precede the first PCR while (iec61883_deque_size (this->ts_queue) > 0) free (iec61883_deque_pop_front (this->ts_queue)); tsbuffer_refill (this); } return this; } void tsbuffer_set_pid (tsbuffer_t this, int pid) { this->selected_pid = pid; } int tsbuffer_read_ts (tsbuffer_t this) { struct mpeg2_ts* new_ts = calloc (1, sizeof (struct mpeg2_ts)); unsigned char *ts = (unsigned char*) new_ts; if (this->read_packet (ts, 1, this->dropped, this->callback_data) < 0) return 0; /* Do not necessarily indicate dropped packet on next call; rawiso handler will set again when needed. */ this->dropped = 0; iec61883_deque_push_back (this->ts_queue, new_ts); return 1; } int tsbuffer_read_to_next_pcr (tsbuffer_t this) { do { if (iec61883_deque_size (this->ts_queue) > MAX_PCR_LOOKAHEAD) { fprintf (stderr, "couldn't find a PCR within %d packets; giving up\n", MAX_PCR_LOOKAHEAD); fprintf (stderr, "(try reducing PCR_SMOOTH_INTERVAL or increase MAX_PCR_LOOKAHEAD\n"); return 0; } if (tsbuffer_read_ts (this) == 0) return 0; if (this->selected_pid == -1 && ts_has_pcr(iec61883_deque_back (this->ts_queue), -1)) this->selected_pid = ts_get_pid (iec61883_deque_back (this->ts_queue)); } while (ts_has_pcr (iec61883_deque_back (this->ts_queue), this->selected_pid) == 0); return 1; } int tsbuffer_refill (tsbuffer_t this) { unsigned int i; u32 n_packets; u64 pcr; u64 delta_pcr; u64 num; u64 denom; for (i = 0; i < PCR_SMOOTH_INTERVAL; i++) if (tsbuffer_read_to_next_pcr (this) == 0) return 0; n_packets = iec61883_deque_size (this->ts_queue); pcr = ts_get_pcr (iec61883_deque_back (this->ts_queue)); if (this->pcr_drift_ref == 0) { // set up a PCR drift calculation this->pcr_drift_ref = pcr; this->pcr_drift_cycles = 0; } delta_pcr = pcr - this->last_pcr; this->delta_pcr_per_packet = delta_pcr / n_packets; // Calculate the TSP packetization rate /* This is an *exact* calculation! We want to send n_packets packets in the period of time covered by delta_pcr. The packet transmission rate should therefore be: (n_packets / delta_pcr) * (27,000,000 PCRs/sec) But what we really want is the packet transmission rate *per ISO cycle*, not per second. So, divide by 8000 ISO cycles/sec: (n_packets / delta_pcr) * (27,000,000 / sec) / (8000 cycles /sec) And then something magic happens: 27,000,000 / 8,000 = 3,375 exactly! So the *exact* transmission rate is: (n_packets / delta_pcr) * (3,375) (per ISO cycle) We use a standard numerator/denominator algorithm to achieve this. */ num = n_packets * 3375; denom = delta_pcr; this->tsp_whole = num / denom; this->tsp_num = num % denom; this->tsp_denom = denom; /* note: We don't reset tsp_accum to zero. This should improve our accuracy, as long as the transmission rate stays fairly constant. */ #if 0 // for my SD streams: 1 90/2981 is best fprintf( stderr, "PCR (PID %d) after %d packets %llu delta = %llu TSP wh %llu num %llu den %llu\n", ts_get_pid (iec61883_deque_back (this->ts_queue)), n_packets, pcr, delta_pcr, this->tsp_whole, this->tsp_num, this->tsp_denom ); #endif this->last_pcr = pcr; this->packets_since_last_pcr = 0; return 1; } u32 tsbuffer_send_iso_cycle (tsbuffer_t this, void *data, u32 iso_cycle, u8 src_node_id, unsigned int dropped) { // choose # of tsps by num/denom algorithm unsigned int n_tsps = this->tsp_whole; unsigned int i; struct buf_cycle *cycle = (struct buf_cycle*) data; this->dropped = dropped; top: if (this->tsp_accum > (this->tsp_denom - this->tsp_num)) { n_tsps += 1; this->tsp_accum -= (this->tsp_denom - this->tsp_num); } else { this->tsp_accum += this->tsp_num; } if (this->pcr_drift_ref != 0) { this->pcr_drift_cycles++; if (this->pcr_drift_cycles % (PCR_DRIFT_INTERVAL * 8000) == 0) { // time to check for PCR drift // estimate the PCR that would correspond to *this* packet, if it had a PCR u64 cur_pcr = this->last_pcr + ((u64) this->packets_since_last_pcr) * this->delta_pcr_per_packet; // compare cur_pcr to pcr_drift_ref + (1/27MHz) * seconds_elapsed_since_pcr_drift_ref s64 drift = (cur_pcr - this->pcr_drift_ref) - (((s64) 27000000) * (this->pcr_drift_cycles / 8000)); #if 0 fprintf (stderr, "PCR drift: %lld after %u seconds\n", drift, this->pcr_drift_cycles/8000); #endif if (ENABLE_PCR_DRIFT_CORRECTION) { //if(pcr_drift_cycles % (5*PCR_DRIFT_INTERVAL*8000) == 0) if ( ( drift > (s64) 27000000 ) | ( drift < (s64) -27000000) ) { fprintf (stderr, "applying PCR correction of %lld\n", -drift); // apply the drift correction //this->last_pcr -= drift; this->pcr_drift_ref = 0; this->pcr_drift_cycles = 0; while (iec61883_deque_size (this->ts_queue) > 0) free (iec61883_deque_pop_front (this->ts_queue)); if (tsbuffer_refill (this) == 0) return 0; goto top; } } } } while (n_tsps > iec61883_deque_size (this->ts_queue)) if (tsbuffer_read_ts (this) == 0) return 0; // write the CIP header fill_mpeg_cip_header (&cycle->header, src_node_id, this->iso_counter); for (i = 0; i < n_tsps; i++) { u32 cycle_count ; memcpy ((char*) &cycle->packet[i].data[0], (char*) iec61883_deque_front (this->ts_queue), sizeof (struct mpeg2_ts)); free (iec61883_deque_pop_front (this->ts_queue)); // set timestamp to iso_cycle + SYT_OFFSET + 1000 offsets per TSP // (since at most 3 TSP per packet; this will ensure monotonically // increasing timestamps, spaced out semi-regularly) cycle_count = (iso_cycle + SYT_OFFSET) % 8000; cycle->packet[i].sph = htonl( make_sph( cycle_count, 1000 * i)); this->packets_since_last_pcr += 1; } if (iec61883_deque_size (this->ts_queue) == 0) if (tsbuffer_refill (this) == 0) return 0; // advance continuity counter by 8 per TSP in this cycle this->iso_counter += 8 * n_tsps; return sizeof (struct CIP_header) + n_tsps * sizeof (struct TSP_packet); } void tsbuffer_close (tsbuffer_t this) { iec61883_deque_close (this->ts_queue); free (this); }