Unknown macro: {next_previous_link3}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

This section explains how to handle exceptions relating to the Thrift/Binary transport such as queue full exceptions by understanding its threading model. It also covers how to analyze the thread dumps of a congested system.

The complete event flow of the Thrift/Binary transport is shown below.

The Thrift/Binary transport threads accept incoming events from the transport and pass them to the Event Blocking Queue at the databridge copmponent. The maximum number of transport threads allowed to be spawned in the thread pool to intercept messages is determined by the tcpMaxWorkerThreads property in the <DAS_HOME>/repository/conf/data-bridge/data-bridge-config.xml file. Separate thread pools are created for the Binary transport and the Thrift transport.

The Event Blocking Queue is a single queue that is common to the events from all the streams. The size of this queue is determined by the eventBufferSize parameter in the data-bridge-config.xml file. Once events are directed to this queue, the Databridge core thread pool (named Databridge-Core-pool-x-thread-x), consumes these threads and adds them to the Blocking Event Queue inside the QueueInputEventDispatcher component. The number of threads in the Databridge core thread-pool is determined by the workerThreads parameter in data-bridge-config.xml file.

The QueueInputEventDispatcher has queues per stream as well as a consumer thread per queue named Thread pool-component -QueueInputEventDispatcher.ExecutorService, appended with the tenant name, stream name, and receiver name. Each consumer consumes the allocated queue and propagates the events through the event processing pipeline, including the final I/O operation to output the event.

Analyzing a congested system

A congested system is often indicated by the EventQueueFullException error.If this error occurs repeatedly in your system after applying the recommended configurations to the data-bridge transport, a likely cause can be that the event processing pipeline is slow and cannot handle the incoming event rate. Therefore, to diagnose such a scenario 3-5 consecutive thread dumps of the system are required.

When analyzing the thread dump, you can start with threads named Databridge-Core-pool-x-thread-x because they are named and easy to find. Also, there are only ten of such threads by default. When the system is congested, you can find all these threads waiting to push events into BlockingEventQueue.

DataBridge-Core-pool-1-thread-10" #463 prio=5 os_prio=0 tid=0x00007f114812d000 nid=0x3d88 waiting on condition [0x00007f10cac4b000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000004cba0e358> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at org.wso2.carbon.event.receiver.core.internal.management.BlockingEventQueue.put(BlockingEventQueue.java:66)
        at org.wso2.carbon.event.receiver.core.internal.management.QueueInputEventDispatcher.onEvent(QueueInputEventDispatcher.java:74)
        at org.wso2.carbon.event.receiver.core.internal.EventReceiver.sendEvent(EventReceiver.java:298)
        at org.wso2.carbon.event.receiver.core.internal.EventReceiver.processTypedEvent(EventReceiver.java:260)
        at org.wso2.carbon.event.receiver.core.internal.EventReceiver$TypedEventSubscription.onEvent(EventReceiver.java:362)
        at org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime.onEvent(InputAdapterRuntime.java:110)
        at org.wso2.carbon.event.input.adapter.wso2event.internal.ds.WSO2EventAdapterServiceDS$1.receive(WSO2EventAdapterServiceDS.java:92)
        at org.wso2.carbon.databridge.core.internal.queue.QueueWorker.run(QueueWorker.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

When all the database threads are waiting as shown above, there are no consumers for the Event Blocking Queue. As a result, that queue is full. This in turn causes the transport threads to wait until there is space in the Event Blocking Queue to hold the intercepted events, resulting in the following stack trace.

"pool-50-thread-7" #652 prio=5 os_prio=0 tid=0x00007f113802f000 nid=0x3e7c waiting for monitor entry [0x00007f10cb453000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.wso2.carbon.databridge.core.internal.queue.EventBlockingQueue.put(EventBlockingQueue.java:49)
        - waiting to lock <0x00000004c1df21e8> (a org.wso2.carbon.databridge.core.internal.queue.EventBlockingQueue)
        at org.wso2.carbon.databridge.core.internal.queue.EventBlockingQueue.put(EventBlockingQueue.java:33)
        at org.wso2.carbon.databridge.core.internal.queue.EventQueue.publish(EventQueue.java:60)
        at org.wso2.carbon.databridge.core.internal.EventDispatcher.publish(EventDispatcher.java:202)
        at org.wso2.carbon.databridge.core.DataBridge.publish(DataBridge.java:248)
        at org.wso2.carbon.databridge.receiver.thrift.service.ThriftEventTransmissionServiceImpl.publish(ThriftEventTransmissionServiceImpl.java:108)
        at org.wso2.carbon.databridge.commons.thrift.service.general.ThriftEventTransmissionService$Processor$publish.getResult(ThriftEventTransmissionService.java:522)
        at org.wso2.carbon.databridge.commons.thrift.service.general.ThriftEventTransmissionService$Processor$publish.getResult(ThriftEventTransmissionService.java:506)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

When the above occurs, the Thrift/Binary transport cannot accept anymore incoming events. This results in the EventQueueFull exception at the sending client's end (such as WSO2 EI, WSO2 APIM and WSO2 IS).

To address this, you need to check the Blocking Event Queues. The blockage in the Event Blocking Queue databridge suggests that one of the Blocking Event Queues in QueueInputEventDispatchers is full, and the Databridge threads are waiting to put events into that queue. 

To identify this issue, you need to check all the QueueInputEventDispatcher threads of which the name is in the Thread pool-component -QueueInputEventDispatcher.ExecutorService format. Then you can identify the thread that is blocked in an I/O or some other heavy operation. To confirm this, you need to analyze multiple thread dumps.

 Click here to view an example of a blocked thread.
"Thread pool- component - QueueInputEventDispatcher.executorService;tenant - -1234;stream - DAS-Stream-PublishEvent:1.0.2;receiver - -1234/Receiver/DAS-Receiver-PublishEvent_1.0.2" #408 prio=5 os_prio=0 tid=0x00007f1104026000 nid=0x3d25 runnable [0x00007f103d751000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at oracle.net.ns.Packet.receive(Packet.java:311)
        at oracle.net.ns.DataPacket.receive(DataPacket.java:105)
        at oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:305)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:249)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:171)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:89)
        at oracle.jdbc.driver.T4CSocketInputStreamWrapper.readNextPacket(T4CSocketInputStreamWrapper.java:123)
        at oracle.jdbc.driver.T4CSocketInputStreamWrapper.read(T4CSocketInputStreamWrapper.java:79)
        at oracle.jdbc.driver.T4CMAREngineStream.unmarshalUB1(T4CMAREngineStream.java:426)
        at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:390)
        at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
        at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
        at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:943)
        at oracle.jdbc.driver.OraclePreparedStatement.executeForRowsWithTimeout(OraclePreparedStatement.java:10932)
        at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:11043)
        - locked <0x00000004e96731d8> (a oracle.jdbc.driver.T4CConnection)
        at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:244)
        at org.wso2.carbon.analytics.datasource.rdbms.RDBMSAnalyticsRecordStore.mergeRecordsSimilar(RDBMSAnalyticsRecordStore.java:213)
        at org.wso2.carbon.analytics.datasource.rdbms.RDBMSAnalyticsRecordStore.addRecordsSimilar(RDBMSAnalyticsRecordStore.java:167)
        at org.wso2.carbon.analytics.datasource.rdbms.RDBMSAnalyticsRecordStore.put(RDBMSAnalyticsRecordStore.java:144)
        at org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.putSimilarRecordBatch(AnalyticsDataServiceImpl.java:891)
        at org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl.put(AnalyticsDataServiceImpl.java:881)
        at org.wso2.carbon.analytics.eventtable.AnalyticsEventTableUtils.putEvents(AnalyticsEventTableUtils.java:55)
        at org.wso2.carbon.analytics.eventtable.AnalyticsEventTable.add(AnalyticsEventTable.java:313)
        at org.wso2.siddhi.core.query.output.callback.InsertIntoTableCallback.send(InsertIntoTableCallback.java:69)
        at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)
        at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)
        at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:121)
        at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:102)
        at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126)
        at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323)
        at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46)
        at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)
        at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)
        at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:121)
        at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)
        at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:56)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80)
        at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:150)
        at org.wso2.siddhi.core.stream.StreamJunction.sendData(StreamJunction.java:214)
        at org.wso2.siddhi.core.stream.StreamJunction.access$200(StreamJunction.java:46)
        at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:343)
        at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:49)
        at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:59)
        at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:51)
        at org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher.sendEvent(SiddhiInputEventDispatcher.java:39)
        at org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher.consumeEvent(AbstractSiddhiInputEventDispatcher.java:104)
        at org.wso2.carbon.event.stream.core.internal.EventJunction.sendEvent(EventJunction.java:146)
        at org.wso2.carbon.event.receiver.core.internal.management.QueueInputEventDispatcher$QueueInputEventDispatcherWorker.run(QueueInputEventDispatcher.java:194)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

In this example, the thread is blocked when executing a database prepared statement initiated by the InsertIntoTable call. Therefore, in this specific scenario, you need to analyze the database operation and take steps to optimize that flow. 

If the Databridge-Core threads are not blocked as shown above, but are instead waiting on getTask(), it means that the EventBlockingQueue is empty and no events are getting delivered to the Databridge layer. In such a scenario, you need to check the thrift/binary transport thread pool. When the EventBlockingQueue is empty, it indicates that the transport threads are idling while waiting on SocketRead() as shown in the trace below.


"pool-50-thread-12" #11748 prio=5 os_prio=0 tid=0x00007f1138040800 nid=0x4c6d runnable [0x00007f10b6f7b000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        - locked <0x00000004e4088148> (a java.io.BufferedInputStream)
        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
        at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
        at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
        at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
        at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)




This means that the publishing component is not sending messages to this system, which can be due to various reasons such as a faulty endpoint address, network issues etc. In such a scenarios investigation should be directed to the publishing client.
  • No labels