This section explains how to handle exceptions relating to the Thrift/Binary transport such as queue full exceptions by understanding its threading model. It also covers how to analyze the thread dumps of a congested system.
The complete event flow of the Thrift/Binary transport is shown below.
The Thrift/Binary transport threads accept incoming events from the transport and pass them to the Event Blocking Queue at the databridge copmponent. The maximum number of transport threads allowed to be spawned in the thread pool to intercept messages is determined by the tcpMaxWorkerThreads
property in the <DAS_HOME>/repository/conf/data-bridge/data-bridge-config.xml
file. Separate thread pools are created for the Binary transport and the Thrift transport.
The Event Blocking Queue is a single queue that is common to the events from all the streams. The size of this queue is determined by the eventBufferSize
parameter in the data-bridge-config.xml
file. Once events are directed to this queue, the Databridge core thread pool (named Databridge-Core-pool-x-thread-x
), consumes these threads and adds them to the Blocking Event Queue inside the QueueInputEventDispatcher
component. The number of threads in the Databridge core thread-pool is determined by the workerThreads
parameter in data-bridge-config.xml
file.
The QueueInputEventDispatcher has queues per stream as well as a consumer thread per queue named Thread pool-component -QueueInputEventDispatcher.ExecutorService
, appended with the tenant name, stream name, and receiver name. Each consumer consumes the allocated queue and propagates the events through the event processing pipeline, including the final I/O operation to output the event.
Analyzing a congested system
A congested system is often indicated by the EventQueueFullException error.If this error occurs repeatedly in your system after applying the recommended configurations to the data-bridge transport, a likely cause can be that the event processing pipeline is slow and cannot handle the incoming event rate. Therefore, to diagnose such a scenario 3-5 consecutive thread dumps of the system are required.
When analyzing the thread dump, you can start with threads named Databridge-Core-pool-x-thread-x
because they are named and easy to find. Also, there are only ten of such threads by default. When the system is congested, you can find all these threads waiting to push events into BlockingEventQueue.
DataBridge-Core-pool-1-thread-10" #463 prio=5 os_prio=0 tid=0x00007f114812d000 nid=0x3d88 waiting on condition [0x00007f10cac4b000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000004cba0e358> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163) at org.wso2.carbon.event.receiver.core.internal.management.BlockingEventQueue.put(BlockingEventQueue.java:66) at org.wso2.carbon.event.receiver.core.internal.management.QueueInputEventDispatcher.onEvent(QueueInputEventDispatcher.java:74) at org.wso2.carbon.event.receiver.core.internal.EventReceiver.sendEvent(EventReceiver.java:298) at org.wso2.carbon.event.receiver.core.internal.EventReceiver.processTypedEvent(EventReceiver.java:260) at org.wso2.carbon.event.receiver.core.internal.EventReceiver$TypedEventSubscription.onEvent(EventReceiver.java:362) at org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime.onEvent(InputAdapterRuntime.java:110) at org.wso2.carbon.event.input.adapter.wso2event.internal.ds.WSO2EventAdapterServiceDS$1.receive(WSO2EventAdapterServiceDS.java:92) at org.wso2.carbon.databridge.core.internal.queue.QueueWorker.run(QueueWorker.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
When all the database threads are waiting as shown above, there are no consumers for the Event Blocking Queue. As a result, that queue is full. This in turn causes the transport threads to wait until there is space in the Event Blocking Queue to hold the intercepted events, resulting in the following stack trace.
"pool-50-thread-7" #652 prio=5 os_prio=0 tid=0x00007f113802f000 nid=0x3e7c waiting for monitor entry [0x00007f10cb453000] java.lang.Thread.State: BLOCKED (on object monitor) at org.wso2.carbon.databridge.core.internal.queue.EventBlockingQueue.put(EventBlockingQueue.java:49) - waiting to lock <0x00000004c1df21e8> (a org.wso2.carbon.databridge.core.internal.queue.EventBlockingQueue) at org.wso2.carbon.databridge.core.internal.queue.EventBlockingQueue.put(EventBlockingQueue.java:33) at org.wso2.carbon.databridge.core.internal.queue.EventQueue.publish(EventQueue.java:60) at org.wso2.carbon.databridge.core.internal.EventDispatcher.publish(EventDispatcher.java:202) at org.wso2.carbon.databridge.core.DataBridge.publish(DataBridge.java:248) at org.wso2.carbon.databridge.receiver.thrift.service.ThriftEventTransmissionServiceImpl.publish(ThriftEventTransmissionServiceImpl.java:108) at org.wso2.carbon.databridge.commons.thrift.service.general.ThriftEventTransmissionService$Processor$publish.getResult(ThriftEventTransmissionService.java:522) at org.wso2.carbon.databridge.commons.thrift.service.general.ThriftEventTransmissionService$Processor$publish.getResult(ThriftEventTransmissionService.java:506) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
When the above occurs, the Thrift/Binary transport cannot accept anymore incoming events. This results in the EventQueueFull
exception at the sending client's end (such as WSO2 EI, WSO2 APIM and WSO2 IS).
To address this, you need to check the Blocking Event Queues. The blockage in the Event Blocking Queue
databridge suggests that one of the Blocking Event Queues in QueueInputEventDispatchers
is full, and the Databridge threads are waiting to put events into that queue.
To identify this issue, you need to check all the QueueInputEventDispatcher threads of which the name is in the Thread pool-component -QueueInputEventDispatcher.ExecutorService
format. Then you can identify the thread that is blocked in an I/O or some other heavy operation. To confirm this, you need to analyze multiple thread dumps.
In this example, the thread is blocked when executing a database prepared statement initiated by the InsertIntoTable
call. Therefore, in this specific scenario, you need to analyze the database operation and take steps to optimize that flow.
getTask()
, it means that the EventBlockingQueue is empty and no events are getting delivered to the Databridge layer. In such a scenario, you need to check the thrift/binary transport thread pool. When the EventBlockingQueue is empty, it indicates that the transport threads are idling while waiting on SocketRead()
as shown in the trace below."pool-50-thread-12" #11748 prio=5 os_prio=0 tid=0x00007f1138040800 nid=0x4c6d runnable [0x00007f10b6f7b000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) - locked <0x00000004e4088148> (a java.io.BufferedInputStream) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)