/*
 * Decompiled with CFR 0.152.
 */
package aurora.application.task;

import aurora.application.features.msg.IConsumer;
import aurora.application.features.msg.IMessage;
import aurora.application.features.msg.IMessageListener;
import aurora.application.features.msg.IMessageStub;
import aurora.application.features.msg.INoticerConsumer;
import aurora.application.features.msg.Message;
import aurora.database.FetchDescriptor;
import aurora.database.service.BusinessModelService;
import aurora.database.service.IDatabaseServiceFactory;
import aurora.database.service.SqlServiceContext;
import aurora.service.IServiceFactory;
import aurora.service.ServiceInvoker;
import aurora.service.ServiceThreadLocal;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.Reader;
import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import javax.sql.DataSource;
import uncertain.composite.CompositeLoader;
import uncertain.composite.CompositeMap;
import uncertain.core.ILifeCycle;
import uncertain.exception.BuiltinExceptionFactory;
import uncertain.logging.ILogger;
import uncertain.logging.LoggingContext;
import uncertain.ocm.AbstractLocatableObject;
import uncertain.ocm.IObjectRegistry;
import uncertain.proc.IProcedureManager;
import uncertain.proc.Procedure;

public class TaskHandler
extends AbstractLocatableObject
implements ILifeCycle,
IMessageListener {
    public static final String LINE_SEPARATOR = System.getProperty("line.separator");
    public static final String DEFAULT_TOPIC = "task";
    public static final String DEFAULT_MESSAGE = "task_message";
    private IObjectRegistry mRegistry;
    private String oldTaskBM;
    private String fetchTaskBM;
    private String updateTaskBM;
    private String finishTaskBM;
    private int threadCount = 2;
    private IDatabaseServiceFactory databaseServiceFactory;
    private DataSource dataSource;
    private IProcedureManager procedureManager;
    private IServiceFactory serviceFactory;
    private ILogger logger;
    private boolean running = true;
    private Queue<CompositeMap> taskQueue = new ConcurrentLinkedQueue<CompositeMap>();
    private ExecutorService mainThreadPool;
    private TaskExecutorManager taskExecutorManager;
    protected String topic = "task";
    protected String message = "task_message";
    private Queue<Connection> connectionQueue = new ConcurrentLinkedQueue<Connection>();
    private Object fetchNewTaskLock = new Object();

    public TaskHandler(IObjectRegistry registry) {
        this.mRegistry = registry;
    }

    public void onInitialize() {
        IConsumer consumer;
        this.logger = LoggingContext.getLogger(this.getClass().getCanonicalName(), this.mRegistry);
        if (this.fetchTaskBM == null) {
            throw BuiltinExceptionFactory.createAttributeMissing(this, "fetchTaskBM");
        }
        if (this.updateTaskBM == null) {
            throw BuiltinExceptionFactory.createAttributeMissing(this, "updateTaskBM");
        }
        if (this.finishTaskBM == null) {
            throw BuiltinExceptionFactory.createAttributeMissing(this, "finishTaskBM");
        }
        this.dataSource = (DataSource)this.mRegistry.getInstanceOfType(DataSource.class);
        if (this.dataSource == null) {
            throw BuiltinExceptionFactory.createInstanceNotFoundException(this, DataSource.class, this.getClass().getName());
        }
        this.databaseServiceFactory = (IDatabaseServiceFactory)this.mRegistry.getInstanceOfType(IDatabaseServiceFactory.class);
        if (this.databaseServiceFactory == null) {
            throw BuiltinExceptionFactory.createInstanceNotFoundException(this, IDatabaseServiceFactory.class, this.getClass().getName());
        }
        this.procedureManager = (IProcedureManager)this.mRegistry.getInstanceOfType(IProcedureManager.class);
        if (this.procedureManager == null) {
            throw BuiltinExceptionFactory.createInstanceNotFoundException(this, IProcedureManager.class, this.getClass().getName());
        }
        this.serviceFactory = (IServiceFactory)this.mRegistry.getInstanceOfType(IServiceFactory.class);
        if (this.serviceFactory == null) {
            throw BuiltinExceptionFactory.createInstanceNotFoundException(this, IServiceFactory.class, this.getClass().getName());
        }
        IMessageStub stub = (IMessageStub)this.mRegistry.getInstanceOfType(IMessageStub.class);
        if (stub == null) {
            throw BuiltinExceptionFactory.createInstanceNotFoundException(this, IMessageStub.class, this.getClass().getName());
        }
        if (!stub.isStarted()) {
            this.logger.warning("JMS MessageStub is not started, please check the configuration.");
        }
        if ((consumer = stub.getConsumer(this.topic)) == null) {
            throw new IllegalStateException("MessageStub does not define the topic '" + this.topic + "', please check the configuration.");
        }
        if (!(consumer instanceof INoticerConsumer)) {
            throw BuiltinExceptionFactory.createInstanceTypeWrongException(this.getOriginSource(), INoticerConsumer.class, IConsumer.class);
        }
        ((INoticerConsumer)consumer).addListener(this.message, this);
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                try {
                    TaskHandler.this.shutdown();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        this.mainThreadPool = Executors.newFixedThreadPool(2);
        TaskFetcher taskFetcher = new TaskFetcher();
        this.taskExecutorManager = new TaskExecutorManager(this.threadCount);
        this.mainThreadPool.submit(taskFetcher);
        this.mainThreadPool.submit(this.taskExecutorManager);
        this.resetUnfinishedTaskStatus(stub);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetUnfinishedTaskStatus(IMessageStub messageStub) {
        Connection connection = this.getConnection();
        if (this.oldTaskBM != null) {
            try {
                CompositeMap context = new CompositeMap();
                Message msg = new Message(this.message, null);
                this.executeBM(connection, this.oldTaskBM, context, new CompositeMap());
                messageStub.getDispatcher().send(this.topic, msg, context);
                context.clear();
            }
            catch (Exception e) {
                this.logger.log(Level.SEVERE, "", e);
            }
            finally {
                this.closeConnection(connection);
            }
        }
    }

    public void executeTask(Connection connection, CompositeMap task) throws Exception {
        CompositeMap context = this.getContext(task);
        if (context == null) {
            context = new CompositeMap();
        }
        ServiceThreadLocal.setCurrentThreadContext(context);
        int task_id = task.getInt("task_id");
        if (task_id == 0) {
            throw BuiltinExceptionFactory.createAttributeMissing(null, "task_id");
        }
        String task_type = task.getString("task_type");
        String proc_file_path = task.getString("proc_file_path");
        CompositeMap proc_content = this.getProcContext(task);
        String sql = task.getString("sql");
        if ("JAVA".equals(task_type)) {
            if (proc_file_path != null && !proc_file_path.equals("")) {
                this.executeProc(proc_file_path, task_id, context, connection);
            } else {
                if (proc_content == null) {
                    throw BuiltinExceptionFactory.createOneAttributeMissing(null, "proc_file_path,proc_content");
                }
                this.executeProc(proc_content, task_id, context, connection);
            }
        } else if ("PROCEDURE".equals(task_type)) {
            if (sql == null || "".equals(sql)) {
                throw BuiltinExceptionFactory.createAttributeMissing(null, "sql");
            }
            this.execDbProc(connection, context, sql);
        } else if ("FUNCTION".equals(task_type)) {
            if (sql == null || "".equals(sql)) {
                throw BuiltinExceptionFactory.createAttributeMissing(null, "sql");
            }
            this.execDbFun(connection, context, sql);
        } else {
            throw new IllegalArgumentException("The " + task_type + " is not supported!");
        }
    }

    private CompositeMap loadFromString(String content) throws Exception {
        if (content == null) {
            return null;
        }
        CompositeMap context = null;
        if (content != null && !"".equals(content)) {
            context = new CompositeLoader().loadFromString(content, "UTF-8");
            this.clearInstance(context);
        }
        return context;
    }

    private CompositeMap getProcContext(CompositeMap taskRecord) throws Exception {
        if (taskRecord == null) {
            return null;
        }
        Object proc_content = taskRecord.get("proc_content");
        if (proc_content == null) {
            return null;
        }
        String str_Proc_content = null;
        str_Proc_content = proc_content instanceof Clob ? this.clobToString((Clob)proc_content) : proc_content.toString();
        return this.loadFromString(str_Proc_content);
    }

    private CompositeMap getContext(CompositeMap taskRecord) throws Exception {
        if (taskRecord == null) {
            return null;
        }
        Object context = taskRecord.get("context");
        if (context == null) {
            return null;
        }
        String strContext = null;
        strContext = context instanceof Clob ? this.clobToString((Clob)context) : context.toString();
        return this.loadFromString(strContext);
    }

    private String clobToString(Clob clob) throws Exception {
        Reader inStreamDoc = clob.getCharacterStream();
        char[] tempDoc = new char[(int)clob.length()];
        inStreamDoc.read(tempDoc);
        inStreamDoc.close();
        return new String(tempDoc);
    }

    private void clearInstance(CompositeMap context) {
        if (context == null) {
            return;
        }
        Iterator it = context.entrySet().iterator();
        if (it == null) {
            return;
        }
        while (it.hasNext()) {
            it.next();
            it.remove();
        }
    }

    public CompositeMap queryBM(Connection connection, String bm_name, CompositeMap context, CompositeMap parameterMap) throws Exception {
        SqlServiceContext sqlContext;
        CompositeMap localContext = context;
        if (localContext == null) {
            localContext = new CompositeMap();
        }
        if ((sqlContext = SqlServiceContext.createSqlServiceContext(localContext)) == null) {
            throw new RuntimeException("Can not create SqlServiceContext for context:" + localContext.toXML());
        }
        sqlContext.setConnection(connection);
        BusinessModelService service = this.databaseServiceFactory.getModelService(bm_name, localContext);
        CompositeMap resultMap = service.queryAsMap(parameterMap, FetchDescriptor.fetchAll());
        return resultMap;
    }

    public void executeBM(Connection connection, String bm_name, CompositeMap context, CompositeMap parameterMap) throws Exception {
        SqlServiceContext sqlContext;
        CompositeMap localContext = context;
        if (localContext == null) {
            localContext = new CompositeMap();
        }
        if ((sqlContext = SqlServiceContext.createSqlServiceContext(localContext)) == null) {
            throw new RuntimeException("Can not create SqlServiceContext for context:" + localContext.toXML());
        }
        connection.setAutoCommit(false);
        sqlContext.setConnection(connection);
        try {
            BusinessModelService service = this.databaseServiceFactory.getModelService(bm_name, localContext);
            service.execute(parameterMap);
            connection.commit();
        }
        catch (Exception ex) {
            this.rollbackConnection(connection);
            throw new RuntimeException(ex);
        }
    }

    protected void executeProc(String procedure_name, int taskId, CompositeMap context, Connection connection) {
        this.logger.log(Level.CONFIG, "load procedure:{0}", new Object[]{procedure_name});
        Procedure proc = null;
        try {
            proc = this.procedureManager.loadProcedure(procedure_name);
            this.executeProc(taskId, proc, context, connection);
        }
        catch (Exception ex) {
            throw BuiltinExceptionFactory.createResourceLoadException(null, procedure_name, ex);
        }
    }

    protected void executeProc(CompositeMap procedure_config, int taskId, CompositeMap context, Connection connection) {
        this.logger.log(Level.CONFIG, "load procedure:{0}", new Object[]{procedure_config.toXML()});
        Procedure proc = null;
        try {
            proc = this.procedureManager.createProcedure(procedure_config);
            this.executeProc(taskId, proc, context, connection);
        }
        catch (Exception ex) {
            throw BuiltinExceptionFactory.createResourceLoadException(null, String.valueOf(taskId), ex);
        }
    }

    protected void executeProc(int taskId, Procedure proc, CompositeMap context, Connection connection) {
        if (proc == null) {
            throw new IllegalArgumentException("Procedure can not be null!");
        }
        try {
            String name = "task." + taskId;
            if (context != null) {
                context.putObject("/parameter/@task_id", (Object)taskId, true);
                ServiceInvoker.invokeProcedureWithTransaction(name, proc, this.serviceFactory, context);
            } else {
                ServiceInvoker.invokeProcedureWithTransaction(name, proc, this.serviceFactory);
            }
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private String execDbFun(Connection connection, CompositeMap context, String function) throws Exception {
        SqlServiceContext sqlContext = SqlServiceContext.createSqlServiceContext(context);
        if (sqlContext == null) {
            throw new RuntimeException("Can not create SqlServiceContext for context:" + context.toXML());
        }
        sqlContext.setConnection(connection);
        String errorMessage = null;
        CallableStatement proc = null;
        try {
            connection.setAutoCommit(false);
            proc = connection.prepareCall("{call ? := " + function + "}");
            proc.registerOutParameter(1, 12);
            proc.execute();
            errorMessage = proc.getString(1);
            if (errorMessage == null || "".equals(errorMessage)) {
                connection.commit();
            } else {
                connection.rollback();
            }
            proc.close();
            this.closeStatement(proc);
        }
        catch (Exception e) {
            try {
                this.rollbackConnection(connection);
                throw new RuntimeException(e);
            }
            catch (Throwable throwable) {
                this.closeStatement(proc);
                throw throwable;
            }
        }
        return errorMessage;
    }

    private void execDbProc(Connection connection, CompositeMap context, String executePkg) throws Exception {
        SqlServiceContext sqlContext = SqlServiceContext.createSqlServiceContext(context);
        if (sqlContext == null) {
            throw new RuntimeException("Can not create SqlServiceContext for context:" + context.toXML());
        }
        sqlContext.setConnection(connection);
        CallableStatement proc = null;
        try {
            connection.setAutoCommit(false);
            proc = connection.prepareCall("{call " + executePkg + "}");
            proc.execute();
            connection.commit();
            proc.close();
            connection.setAutoCommit(true);
            this.closeStatement(proc);
        }
        catch (Exception e) {
            try {
                this.rollbackConnection(connection);
                throw new RuntimeException(e);
            }
            catch (Throwable throwable) {
                this.closeStatement(proc);
                throw throwable;
            }
        }
    }

    private void rollbackConnection(Connection dbConn) {
        if (dbConn == null) {
            return;
        }
        try {
            dbConn.rollback();
        }
        catch (SQLException ex) {
            this.logger.log(Level.SEVERE, "", ex);
        }
    }

    private void closeStatement(Statement stmt) {
        if (stmt == null) {
            return;
        }
        try {
            stmt.close();
        }
        catch (SQLException ex) {
            this.logger.log(Level.SEVERE, "", ex);
        }
    }

    private void closeConnection(Connection conn) {
        if (conn == null) {
            return;
        }
        try {
            conn.close();
        }
        catch (SQLException ex) {
            this.logger.log(Level.SEVERE, "", ex);
        }
    }

    public String getOldTaskBM() {
        return this.oldTaskBM;
    }

    public void setOldTaskBM(String oldTaskBM) {
        this.oldTaskBM = oldTaskBM;
    }

    public String getFetchTaskBM() {
        return this.fetchTaskBM;
    }

    public void setFetchTaskBM(String fetchTaskBM) {
        this.fetchTaskBM = fetchTaskBM;
    }

    public String getUpdateTaskBM() {
        return this.updateTaskBM;
    }

    public void setUpdateTaskBM(String updateTaskBM) {
        this.updateTaskBM = updateTaskBM;
    }

    public String getFinishTaskBM() {
        return this.finishTaskBM;
    }

    public void setFinishTaskBM(String finishTaskBM) {
        this.finishTaskBM = finishTaskBM;
    }

    public int getThreadCount() {
        return this.threadCount;
    }

    public void setThreadCount(int threadCount) {
        this.threadCount = threadCount;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getMessage() {
        return this.message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    private Connection getConnection() {
        Connection connection = null;
        try {
            connection = this.dataSource.getConnection();
        }
        catch (SQLException e) {
            throw new IllegalStateException(e);
        }
        if (connection == null) {
            throw new IllegalStateException("Can't get database connection from dataSource.");
        }
        return connection;
    }

    private String getFullStackTrace(Throwable exception) {
        String message = this.getExceptionStackTrace(exception);
        return message;
    }

    private String getExceptionStackTrace(Throwable exception) {
        if (exception == null) {
            return null;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream pw = new PrintStream(baos);
        exception.printStackTrace(pw);
        pw.close();
        return baos.toString();
    }

    @Override
    public boolean startup() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        this.running = false;
        Object object = this.fetchNewTaskLock;
        synchronized (object) {
            this.fetchNewTaskLock.notify();
        }
        try {
            this.taskExecutorManager.shutdown();
        }
        catch (Exception e) {
            this.logger.log(Level.SEVERE, "", e);
        }
        if (this.mainThreadPool != null) {
            List<Runnable> taskList = this.mainThreadPool.shutdownNow();
            for (Runnable task : taskList) {
                if (task instanceof ILifeCycle) {
                    ((ILifeCycle)((Object)task)).shutdown();
                    continue;
                }
                this.logger.log(Level.SEVERE, "Task " + task.toString() + " can not shutdown!");
            }
        }
        if (this.connectionQueue != null) {
            Connection connection = this.connectionQueue.poll();
            while (connection != null) {
                this.closeConnection(connection);
                connection = this.connectionQueue.poll();
            }
        }
    }

    private void addToTaskQueue(CompositeMap task) {
        if (task == null) {
            return;
        }
        this.taskQueue.add(task);
    }

    private CompositeMap popTaskQueue() {
        return this.taskQueue.poll();
    }

    private int getTaskId(CompositeMap taskRecord) {
        if (taskRecord == null) {
            return -1;
        }
        return taskRecord.getInt("task_id", -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessage(IMessage message) {
        try {
            if (this.taskQueue.size() > 0) {
                return;
            }
            this.logger.log(Level.CONFIG, "receive a messsage:" + message.getText());
            Object object = this.fetchNewTaskLock;
            synchronized (object) {
                this.fetchNewTaskLock.notify();
            }
        }
        catch (Exception e) {
            this.logger.log(Level.WARNING, "Can not add the task:" + message);
        }
    }

    class TaskExecutor
    implements Callable<String> {
        private CompositeMap taskRecord;
        private ExecutorService timeOutService;

        public TaskExecutor(ExecutorService timeOutService, CompositeMap taskRecord) {
            this.timeOutService = timeOutService;
            this.taskRecord = taskRecord;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public String call() throws Exception {
            Connection connection = (Connection)TaskHandler.this.connectionQueue.poll();
            while (connection == null) {
                TaskHandler.this.logger.log(Level.SEVERE, "Can not get database connection!");
                Thread.sleep(1000L);
                connection = (Connection)TaskHandler.this.connectionQueue.poll();
            }
            CompositeMap para = new CompositeMap();
            CompositeMap context = new CompositeMap();
            para.put("task_id", (Object)TaskHandler.this.getTaskId(this.taskRecord));
            try {
                context = TaskHandler.this.getContext(this.taskRecord);
                ServiceThreadLocal.setCurrentThreadContext(context);
                TaskHandler.this.logger.log(Level.CONFIG, "begin to execute task,task_id=" + TaskHandler.this.getTaskId(this.taskRecord));
                int execute_time = this.taskRecord.getInt("retry_time", 0) + 1;
                int current_retry_time = this.taskRecord.getInt("current_retry_time", 0);
                int time_out = this.taskRecord.getInt("time_out");
                StringBuilder excepiton = new StringBuilder();
                String errorMessage = null;
                para.put("status", "running");
                try {
                    TaskHandler.this.executeBM(connection, TaskHandler.this.updateTaskBM, context, para);
                }
                catch (Throwable e) {
                    TaskHandler.this.logger.log(Level.SEVERE, "", e);
                }
                while (current_retry_time < execute_time) {
                    block21: {
                        if (current_retry_time > 0) {
                            try {
                                para.put("current_retry_time", (Object)current_retry_time);
                                TaskHandler.this.executeBM(connection, TaskHandler.this.updateTaskBM, context, para);
                            }
                            catch (Throwable e) {
                                TaskHandler.this.logger.log(Level.SEVERE, "", e);
                            }
                        }
                        try {
                            if (time_out != 0) {
                                errorMessage = this.executeTimeOutTask(connection, time_out);
                                if (errorMessage == null || errorMessage.isEmpty()) {
                                    excepiton = null;
                                    break;
                                }
                                excepiton.append(errorMessage).append(LINE_SEPARATOR);
                                break block21;
                            }
                            TaskHandler.this.executeTask(connection, this.taskRecord);
                            excepiton = null;
                            break;
                        }
                        catch (Exception e) {
                            excepiton.append(TaskHandler.this.getFullStackTrace(e)).append(LINE_SEPARATOR);
                        }
                    }
                    ++current_retry_time;
                }
                if (excepiton != null && excepiton.length() != 0) {
                    para.put("exception", excepiton.toString());
                    TaskHandler.this.logger.log(Level.SEVERE, excepiton.toString());
                }
                TaskHandler.this.logger.log(Level.CONFIG, "finish task,task_id=" + TaskHandler.this.getTaskId(this.taskRecord));
                TaskHandler.this.logger.log(Level.CONFIG, "pass parameter =" + para.toXML());
                this.updateFinishStatus(connection, context, para);
            }
            catch (Exception e) {
                TaskHandler.this.logger.log(Level.SEVERE, "", e);
                para.put("exception", TaskHandler.this.getFullStackTrace(e));
                this.updateFinishStatus(connection, context, para);
            }
            finally {
                TaskHandler.this.connectionQueue.add(connection);
            }
            if (TaskHandler.this.taskQueue.size() == 0) {
                Object object = TaskHandler.this.fetchNewTaskLock;
                synchronized (object) {
                    TaskHandler.this.fetchNewTaskLock.notify();
                }
            }
            ServiceThreadLocal.remove();
            return "finished";
        }

        private void updateFinishStatus(Connection connection, CompositeMap context, CompositeMap newPara) {
            try {
                TaskHandler.this.executeBM(connection, TaskHandler.this.finishTaskBM, context, newPara);
            }
            catch (Throwable e) {
                TaskHandler.this.logger.log(Level.SEVERE, "", e);
            }
        }

        private String executeTimeOutTask(Connection connection, int timeOut) {
            CallableTask callableTask = new CallableTask(connection, this.taskRecord);
            StringBuilder excepiton = new StringBuilder();
            Future<String> future = this.timeOutService.submit(callableTask);
            try {
                String result = future.get(timeOut, TimeUnit.MILLISECONDS);
                return result;
            }
            catch (Exception e) {
                boolean successful = future.cancel(true);
                if (!successful) {
                    TaskHandler.this.logger.log(Level.WARNING, "Can not cancel the task:" + TaskHandler.this.getTaskId(this.taskRecord));
                    return excepiton.toString();
                }
                excepiton.append(TaskHandler.this.getFullStackTrace(e));
                return excepiton.toString();
            }
        }

        class CallableTask
        implements Callable<String> {
            private CompositeMap taskRecord;
            private Connection connection;

            public CallableTask(Connection connection, CompositeMap taskRecord) {
                this.connection = connection;
                this.taskRecord = taskRecord;
            }

            @Override
            public String call() throws Exception {
                try {
                    TaskHandler.this.executeTask(this.connection, this.taskRecord);
                }
                catch (Exception e) {
                    return TaskHandler.this.getFullStackTrace(e);
                }
                return null;
            }
        }
    }

    class TaskExecutorManager
    implements Callable<String>,
    ILifeCycle {
        int mThreadCount;
        ExecutorService timeOutService;
        ExecutorService handleTaskService;

        public TaskExecutorManager(int threadCount) {
            this.mThreadCount = threadCount;
        }

        @Override
        public String call() throws Exception {
            try {
                this.timeOutService = Executors.newCachedThreadPool();
                this.handleTaskService = Executors.newFixedThreadPool(this.mThreadCount);
                for (int i = 0; i < this.mThreadCount; ++i) {
                    TaskHandler.this.connectionQueue.add(TaskHandler.this.getConnection());
                }
                while (TaskHandler.this.running) {
                    CompositeMap taskRecord = TaskHandler.this.popTaskQueue();
                    try {
                        if (taskRecord == null || taskRecord.isEmpty()) {
                            Thread.sleep(1000L);
                            continue;
                        }
                        TaskHandler.this.logger.log(Level.CONFIG, "get a task record from queue,task is" + LINE_SEPARATOR + LINE_SEPARATOR + taskRecord.toXML());
                        Object task_id = taskRecord.get("task_id");
                        if (task_id == null || "null".equals(task_id)) {
                            Thread.sleep(1000L);
                            continue;
                        }
                        TaskExecutor taskExecutor = new TaskExecutor(this.timeOutService, taskRecord);
                        this.handleTaskService.submit(taskExecutor);
                    }
                    catch (InterruptedException e) {
                    }
                    catch (Throwable e) {
                        TaskHandler.this.logger.log(Level.SEVERE, "", e);
                    }
                }
            }
            catch (Exception e) {
                TaskHandler.this.logger.log(Level.SEVERE, "", e);
            }
            return "finished";
        }

        @Override
        public boolean startup() {
            return true;
        }

        @Override
        public void shutdown() {
            if (this.timeOutService != null) {
                this.timeOutService.shutdownNow();
            }
            if (this.handleTaskService != null) {
                this.handleTaskService.shutdownNow();
            }
        }
    }

    class TaskFetcher
    implements Callable<String> {
        Connection connection;

        TaskFetcher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public String call() throws Exception {
            try {
                int failedTime = 0;
                int retryTime = 10;
                boolean hasNext = false;
                this.connection = TaskHandler.this.getConnection();
                while (TaskHandler.this.running) {
                    Object object = TaskHandler.this.fetchNewTaskLock;
                    synchronized (object) {
                        block16: {
                            if (!hasNext) {
                                TaskHandler.this.fetchNewTaskLock.wait();
                            }
                            if (!TaskHandler.this.running) {
                                return "finished";
                            }
                            CompositeMap task = new CompositeMap();
                            CompositeMap context = new CompositeMap();
                            try {
                                int record_count;
                                TaskHandler.this.executeBM(this.connection, TaskHandler.this.fetchTaskBM, context, task);
                                if (task == null || TaskHandler.this.getTaskId(task) == -1) {
                                    continue;
                                }
                                TaskHandler.this.logger.log(Level.CONFIG, "add record to queue,task_id=" + TaskHandler.this.getTaskId(task));
                                TaskHandler.this.addToTaskQueue(task);
                                hasNext = false;
                                if (TaskHandler.this.connectionQueue.size() > 0 || (record_count = task.getInt("record_count", -1)) <= 1) break block16;
                                hasNext = true;
                            }
                            catch (Exception e) {
                                TaskHandler.this.logger.log(Level.SEVERE, "", e);
                                if (++failedTime >= retryTime) {
                                    TaskHandler.this.logger.log(Level.SEVERE, "It has failed " + failedTime + " time when get task from database! It will quit now.");
                                    return "finished";
                                }
                                TaskHandler.this.closeConnection(this.connection);
                                this.connection = TaskHandler.this.getConnection();
                                TaskHandler.this.logger.log(Level.SEVERE, "It has failed " + failedTime + " time when get task from database,please check the configuration!");
                                continue;
                            }
                        }
                    }
                }
                return "finished";
            }
            catch (InterruptedException e) {
                return "finished";
            }
            catch (Exception e) {
                TaskHandler.this.logger.log(Level.SEVERE, "", e);
                return "finished";
            }
            finally {
                TaskHandler.this.closeConnection(this.connection);
            }
        }
    }
}

