OpenMAXBellagio  0.9.3
omx_base_source.c
Go to the documentation of this file.
1 
29 #include <omxcore.h>
30 #include <omx_base_source.h>
31 
33 
35  omx_base_source_PrivateType* omx_base_source_Private;
36 
37  if (openmaxStandComp->pComponentPrivate) {
38  omx_base_source_Private = (omx_base_source_PrivateType*)openmaxStandComp->pComponentPrivate;
39  } else {
40  omx_base_source_Private = calloc(1,sizeof(omx_base_source_PrivateType));
41  if (!omx_base_source_Private) {
43  }
44  }
45 
46  // we could create our own port structures here
47  // fixme maybe the base class could use a "port factory" function pointer?
48  err = omx_base_component_Constructor(openmaxStandComp, cComponentName);
49 
50  /* here we can override whatever defaults the base_component constructor set
51  * e.g. we can override the function pointers in the private struct */
52  omx_base_source_Private = openmaxStandComp->pComponentPrivate;
53  omx_base_source_Private->BufferMgmtFunction = omx_base_source_BufferMgmtFunction;
54 
55  return err;
56 }
57 
59 
60  return omx_base_component_Destructor(openmaxStandComp);
61 }
62 
69 
70  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
71  omx_base_component_PrivateType* omx_base_component_Private = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
72  omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
73  omx_base_PortType *pOutPort = (omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
74  tsem_t* pOutputSem = pOutPort->pBufferSem;
75  queue_t* pOutputQueue = pOutPort->pBufferQueue;
76  OMX_BUFFERHEADERTYPE* pOutputBuffer = NULL;
77  OMX_COMPONENTTYPE* target_component;
78  OMX_BOOL isOutputBufferNeeded = OMX_TRUE;
79  int outBufExchanged = 0;
80 
81 #if defined(__linux__)
82  omx_base_source_Private->bellagioThreads->nThreadBufferMngtID = (long int)syscall(__NR_gettid);
83  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s the thread ID is %i\n", __func__, (int)omx_base_source_Private->bellagioThreads->nThreadBufferMngtID);
84 #endif
85 
86  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
87  while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting ||
88  omx_base_component_Private->state == OMX_StatePause || omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
89 
90  /*Wait till the ports are being flushed*/
91  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
92  while( PORT_IS_BEING_FLUSHED(pOutPort)) {
93  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
94 
95  if(isOutputBufferNeeded == OMX_FALSE) {
96  pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
97  outBufExchanged--;
98  pOutputBuffer = NULL;
99  isOutputBufferNeeded = OMX_TRUE;
100  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output buffer\n");
101  }
102  DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
103 
104  tsem_up(omx_base_source_Private->flush_all_condition);
105  tsem_down(omx_base_source_Private->flush_condition);
106  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
107  }
108  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
109 
110  /*No buffer to process. So wait here*/
111  if((isOutputBufferNeeded==OMX_TRUE && pOutputSem->semval==0) &&
112  (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
113  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer \n");
114  tsem_down(omx_base_source_Private->bMgmtSem);
115  }
116 
117  if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
118  DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
119  break;
120  }
121 
122  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer semval=%d \n",pOutputSem->semval);
123  if(pOutputSem->semval > 0 && isOutputBufferNeeded == OMX_TRUE ) {
124  tsem_down(pOutputSem);
125  if(pOutputQueue->nelem>0){
126  outBufExchanged++;
127  isOutputBufferNeeded = OMX_FALSE;
128  pOutputBuffer = dequeue(pOutputQueue);
129  if(pOutputBuffer == NULL){
130  DEBUG(DEB_LEV_ERR, "In %s Had NULL output buffer!!\n",__func__);
131  break;
132  }
133  }
134  }
135 
136  if(isOutputBufferNeeded == OMX_FALSE) {
137  if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS) {
138  pOutputBuffer->nFlags = 0;
139  }
140 
141  if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
142  pOutputBuffer->hMarkTargetComponent = omx_base_source_Private->pMark.hMarkTargetComponent;
143  pOutputBuffer->pMarkData = omx_base_source_Private->pMark.pMarkData;
144  omx_base_source_Private->pMark.hMarkTargetComponent = NULL;
145  omx_base_source_Private->pMark.pMarkData = NULL;
146  }
147 
148  target_component = (OMX_COMPONENTTYPE*)pOutputBuffer->hMarkTargetComponent;
149  if(target_component == (OMX_COMPONENTTYPE *)openmaxStandComp) {
150  /*Clear the mark and generate an event*/
151  (*(omx_base_component_Private->callbacks->EventHandler))
152  (openmaxStandComp,
153  omx_base_component_Private->callbackData,
154  OMX_EventMark, /* The command was completed */
155  1, /* The commands was a OMX_CommandStateSet */
156  0, /* The state has been changed in message->messageParam2 */
157  pOutputBuffer->pMarkData);
158  } else if(pOutputBuffer->hMarkTargetComponent != NULL) {
159  /*If this is not the target component then pass the mark*/
160  DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
161  }
162 
163  if(omx_base_source_Private->state == OMX_StateExecuting) {
164  if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer->nFilledLen == 0) {
165  (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer);
166  } else {
167  /*It no buffer management call back then don't produce any output buffer*/
168  pOutputBuffer->nFilledLen = 0;
169  }
170  } else {
171  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_source_Private->state);
172  }
173  if(omx_base_source_Private->state == OMX_StatePause && !PORT_IS_BEING_FLUSHED(pOutPort)) {
174  /*Waiting at paused state*/
175  tsem_wait(omx_base_source_Private->bStateSem);
176  }
177 
178  if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS) {
179  DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in output buffer\n");
180 
181  (*(omx_base_component_Private->callbacks->EventHandler))
182  (openmaxStandComp,
183  omx_base_component_Private->callbackData,
184  OMX_EventBufferFlag, /* The command was completed */
185  0, /* The commands was a OMX_CommandStateSet */
186  pOutputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
187  NULL);
188  //pOutputBuffer->nFlags = 0;
189  omx_base_source_Private->bIsEOSReached = OMX_TRUE;
190  }
191 
192  /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
193  if((pOutputBuffer->nFilledLen != 0) || ((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) ==OMX_BUFFERFLAG_EOS) || (omx_base_source_Private->bIsEOSReached == OMX_TRUE)) {
194  if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS)
195  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s nFlags=%x Name=%s \n", __func__, (int)pOutputBuffer->nFlags, omx_base_source_Private->name);
196  pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
197  outBufExchanged--;
198  pOutputBuffer = NULL;
199  isOutputBufferNeeded = OMX_TRUE;
200  }
201  }
202  }
203  DEBUG(DEB_LEV_SIMPLE_SEQ, "Exiting Buffer Management Thread\n");
204  return NULL;
205 }
206 
214  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
215  omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
216  omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
217  omx_base_PortType *pOutPort[2];
218  tsem_t* pOutputSem[2];
219  queue_t* pOutputQueue[2];
220  OMX_BUFFERHEADERTYPE* pOutputBuffer[2];
221  OMX_COMPONENTTYPE* target_component;
222  OMX_BOOL isOutputBufferNeeded[2];
223  int i,outBufExchanged[2];
224 
225  pOutPort[0]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
226  pOutPort[1]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX_1];
227  pOutputSem[0] = pOutPort[0]->pBufferSem;
228  pOutputSem[1] = pOutPort[1]->pBufferSem;
229  pOutputQueue[0] = pOutPort[0]->pBufferQueue;
230  pOutputQueue[1] = pOutPort[1]->pBufferQueue;
231  pOutputBuffer[1]= pOutputBuffer[0]=NULL;
232  isOutputBufferNeeded[0]=isOutputBufferNeeded[1]=OMX_TRUE;
233  outBufExchanged[0]=outBufExchanged[1]=0;
234 
235  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__);
236  while(omx_base_source_Private->state == OMX_StateIdle || omx_base_source_Private->state == OMX_StateExecuting || omx_base_source_Private->state == OMX_StatePause ||
237  omx_base_source_Private->transientState == OMX_TransStateLoadedToIdle){
238 
239  /*Wait till the ports are being flushed*/
240  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
241  while( PORT_IS_BEING_FLUSHED(pOutPort[0]) ||
242  PORT_IS_BEING_FLUSHED(pOutPort[1])) {
243  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
244 
245  DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
246  __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
247 
248  if(isOutputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[1])) {
249  pOutPort[1]->ReturnBufferFunction(pOutPort[1],pOutputBuffer[1]);
250  outBufExchanged[1]--;
251  pOutputBuffer[1]=NULL;
252  isOutputBufferNeeded[1]=OMX_TRUE;
253  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 1 buffer\n");
254  }
255 
256  if(isOutputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[0])) {
257  pOutPort[0]->ReturnBufferFunction(pOutPort[0],pOutputBuffer[0]);
258  outBufExchanged[0]--;
259  pOutputBuffer[0]=NULL;
260  isOutputBufferNeeded[0]=OMX_TRUE;
261  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 0 buffer\n");
262  }
263 
264  DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
265  __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
266 
267  tsem_up(omx_base_source_Private->flush_all_condition);
268  tsem_down(omx_base_source_Private->flush_condition);
269  pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
270  }
271  pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
272 
273  /*No buffer to process. So wait here*/
274  if((isOutputBufferNeeded[0]==OMX_TRUE && pOutputSem[0]->semval==0) &&
275  (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
276  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
277  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 0\n");
278  tsem_down(omx_base_source_Private->bMgmtSem);
279 
280  }
281  if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
282  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
283  break;
284  }
285  if((isOutputBufferNeeded[1]==OMX_TRUE && pOutputSem[1]->semval==0) &&
286  (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid) &&
287  !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
288  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
289  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 1\n");
290  tsem_down(omx_base_source_Private->bMgmtSem);
291 
292  }
293  if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
294  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
295  break;
296  }
297 
298  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer 0 semval=%d \n",pOutputSem[0]->semval);
299  if(pOutputSem[0]->semval>0 && isOutputBufferNeeded[0]==OMX_TRUE ) {
300  tsem_down(pOutputSem[0]);
301  if(pOutputQueue[0]->nelem>0){
302  outBufExchanged[0]++;
303  isOutputBufferNeeded[0]=OMX_FALSE;
304  pOutputBuffer[0] = dequeue(pOutputQueue[0]);
305  if(pOutputBuffer[0] == NULL){
306  DEBUG(DEB_LEV_ERR, "Had NULL output buffer!!\n");
307  break;
308  }
309  }
310  }
311  /*When we have input buffer to process then get one output buffer*/
312  if(pOutputSem[1]->semval>0 && isOutputBufferNeeded[1]==OMX_TRUE) {
313  tsem_down(pOutputSem[1]);
314  if(pOutputQueue[1]->nelem>0){
315  outBufExchanged[1]++;
316  isOutputBufferNeeded[1]=OMX_FALSE;
317  pOutputBuffer[1] = dequeue(pOutputQueue[1]);
318  if(pOutputBuffer[1] == NULL){
319  DEBUG(DEB_LEV_ERR, "Had NULL output buffer!! op is=%d,iq=%d\n",pOutputSem[1]->semval,pOutputQueue[1]->nelem);
320  break;
321  }
322  }
323  }
324 
325  for(i=0;i < (omx_base_component_Private->sPortTypesParam[OMX_PortDomainAudio].nPorts +
326  omx_base_component_Private->sPortTypesParam[OMX_PortDomainVideo].nPorts +
327  omx_base_component_Private->sPortTypesParam[OMX_PortDomainImage].nPorts +
328  omx_base_component_Private->sPortTypesParam[OMX_PortDomainOther].nPorts -1);i++) {
329 
330  if(omx_base_source_Private->ports[i]->sPortParam.eDomain!=OMX_PortDomainOther){ /* clock ports are not to be processed */
331  /*Process Output buffer of Port i */
332  if(isOutputBufferNeeded[i]==OMX_FALSE) {
333 
334  /*Pass the Mark to all outgoing buffers*/
335  if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
336  pOutputBuffer[i]->hMarkTargetComponent = omx_base_source_Private->pMark.hMarkTargetComponent;
337  pOutputBuffer[i]->pMarkData = omx_base_source_Private->pMark.pMarkData;
338  }
339 
340  target_component=(OMX_COMPONENTTYPE*)pOutputBuffer[i]->hMarkTargetComponent;
341  if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
342  /*Clear the mark and generate an event*/
343  (*(omx_base_source_Private->callbacks->EventHandler))
344  (openmaxStandComp,
345  omx_base_source_Private->callbackData,
346  OMX_EventMark, /* The command was completed */
347  1, /* The commands was a OMX_CommandStateSet */
348  i, /* The state has been changed in message->messageParam2 */
349  pOutputBuffer[i]->pMarkData);
350  } else if(pOutputBuffer[i]->hMarkTargetComponent!=NULL){
351  /*If this is not the target component then pass the mark*/
352  DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
353  }
354 
355  if(omx_base_source_Private->state == OMX_StateExecuting) {
356  if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer[i]->nFilledLen == 0) {
357  (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer[i]);
358  } else {
359  /*If no buffer management call back then don't produce any output buffer*/
360  pOutputBuffer[i]->nFilledLen = 0;
361  }
362  } else {
363  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_source_Private->state);
364  }
365 
366  if((pOutputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS && pOutputBuffer[i]->nFilledLen==0) {
367  DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pOutputBuffer[i]->nFilledLen);
368  (*(omx_base_source_Private->callbacks->EventHandler))
369  (openmaxStandComp,
370  omx_base_source_Private->callbackData,
371  OMX_EventBufferFlag, /* The command was completed */
372  i, /* The commands was a OMX_CommandStateSet */
373  pOutputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */
374  NULL);
375  }
376  if(omx_base_source_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
377  /*Waiting at paused state*/
378  tsem_wait(omx_base_component_Private->bStateSem);
379  }
380 
381  /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
382  if(pOutputBuffer[i]->nFilledLen!=0 || (pOutputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS){
383  pOutPort[i]->ReturnBufferFunction(pOutPort[i],pOutputBuffer[i]);
384  outBufExchanged[i]--;
385  pOutputBuffer[i]=NULL;
386  isOutputBufferNeeded[i]=OMX_TRUE;
387  }
388  }
389  }
390  }
391 
392  /*Clear the Mark*/
393  if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
394  omx_base_source_Private->pMark.hMarkTargetComponent = NULL;
395  omx_base_source_Private->pMark.pMarkData = NULL;
396  }
397  }
398  DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
399  return NULL;
400 }
void tsem_wait(tsem_t *tsem)
Definition: tsemaphore.c:131
OMX_ERRORTYPE omx_base_component_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
The base constructor for the OpenMAX ST components.
#define DEB_LEV_SIMPLE_SEQ
#define PORT_IS_BEING_FLUSHED(pPort)
Definition: omx_base_port.h:39
#define OMX_BASE_SOURCE_OUTPUTPORT_INDEX
#define DEBUG(n, fmt, args...)
queue_t * pBufferQueue
void *(* BufferMgmtFunction)(void *param)
char * OMX_STRING
Definition: OMX_Types.h:206
OMX_BOOL
Definition: OMX_Types.h:189
#define DEB_LEV_ERR
OMX_ERRORTYPE(* EventHandler)(OMX_IN OMX_HANDLETYPE hComponent, OMX_IN OMX_PTR pAppData, OMX_IN OMX_EVENTTYPE eEvent, OMX_IN OMX_U32 nData1, OMX_IN OMX_U32 nData2, OMX_IN OMX_PTR pEventData)
Definition: OMX_Core.h:530
#define OMX_BASE_SOURCE_OUTPUTPORT_INDEX_1
void tsem_up(tsem_t *tsem)
Definition: tsemaphore.c:110
OMX_ERRORTYPE omx_base_source_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
void tsem_down(tsem_t *tsem)
Definition: tsemaphore.c:97
Definition: queue.h:43
void * omx_base_source_twoport_BufferMgmtFunction(void *param)
OMX_ERRORTYPE omx_base_source_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
#define OMX_BUFFERFLAG_EOS
Definition: OMX_Core.h:299
OMX_ERRORTYPE omx_base_component_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
The base destructor for ST OpenMAX components.
OMX_ERRORTYPE err
#define DEB_LEV_FULL_SEQ
OMX_HANDLETYPE hMarkTargetComponent
Definition: OMX_Core.h:417
OMX_PTR pComponentPrivate
#define DEB_LEV_FUNCTION_NAME
void * omx_base_source_BufferMgmtFunction(void *param)
OMX_ERRORTYPE(* ReturnBufferFunction)(omx_base_PortType *openmaxStandPort, OMX_BUFFERHEADERTYPE *pBuffer)
void * dequeue(queue_t *queue)
Definition: queue.c:122
OMX_PORT_PARAM_TYPE sPortTypesParam[4]
OMX_ERRORTYPE
Definition: OMX_Core.h:126

Generated for OpenMAX Bellagio rel. 0.9.3 by  doxygen 1.5.1
SourceForge.net Logo