/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client.impl.running;

import com.alibaba.otter.canal.client.impl.running.ClientRunningData;
import com.alibaba.otter.canal.client.impl.running.ClientRunningListener;
import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.common.utils.BooleanMutex;
import com.alibaba.otter.canal.common.utils.JsonUtils;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.net.InetSocketAddress;
import java.text.MessageFormat;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class ClientRunningMonitor
extends AbstractCanalLifeCycle {
    private static final Logger logger = LoggerFactory.getLogger(ClientRunningMonitor.class);
    private ZkClientx zkClient;
    private String destination;
    private ClientRunningData clientData;
    private IZkDataListener dataListener;
    private BooleanMutex mutex = new BooleanMutex(Boolean.valueOf(false));
    private volatile boolean release = false;
    private volatile ClientRunningData activeData;
    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
    private ClientRunningListener listener;
    private int delayTime = 5;

    public ClientRunningMonitor() {
        this.dataListener = new IZkDataListener(){

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put((String)"destination", (String)ClientRunningMonitor.this.destination);
                ClientRunningData runningData = (ClientRunningData)JsonUtils.unmarshalFromByte((byte[])((byte[])data), ClientRunningData.class);
                if (!ClientRunningMonitor.this.isMine(runningData.getAddress())) {
                    ClientRunningMonitor.this.mutex.set(Boolean.valueOf(false));
                }
                if (!runningData.isActive() && ClientRunningMonitor.this.isMine(runningData.getAddress())) {
                    ClientRunningMonitor.this.release = true;
                    ClientRunningMonitor.this.releaseRunning();
                }
                ClientRunningMonitor.this.activeData = runningData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put((String)"destination", (String)ClientRunningMonitor.this.destination);
                ClientRunningMonitor.this.mutex.set(Boolean.valueOf(false));
                ClientRunningMonitor.this.processActiveExit();
                if (!ClientRunningMonitor.this.release && ClientRunningMonitor.this.activeData != null && ClientRunningMonitor.this.isMine(ClientRunningMonitor.this.activeData.getAddress())) {
                    ClientRunningMonitor.this.initRunning();
                } else {
                    ClientRunningMonitor.this.delayExector.schedule(new Runnable(){

                        @Override
                        public void run() {
                            ClientRunningMonitor.this.initRunning();
                        }
                    }, (long)ClientRunningMonitor.this.delayTime, TimeUnit.SECONDS);
                }
            }
        };
    }

    public void start() {
        super.start();
        String path = ZookeeperPathUtils.getDestinationClientRunning((String)this.destination, (short)this.clientData.getClientId());
        this.zkClient.subscribeDataChanges(path, this.dataListener);
        this.initRunning();
    }

    public void stop() {
        super.stop();
        String path = ZookeeperPathUtils.getDestinationClientRunning((String)this.destination, (short)this.clientData.getClientId());
        this.zkClient.unsubscribeDataChanges(path, this.dataListener);
        this.releaseRunning();
    }

    public synchronized void initRunning() {
        if (!this.isStart()) {
            return;
        }
        String path = ZookeeperPathUtils.getDestinationClientRunning((String)this.destination, (short)this.clientData.getClientId());
        byte[] bytes = JsonUtils.marshalToByte((Object)this.clientData);
        try {
            this.mutex.set(Boolean.valueOf(false));
            this.zkClient.create(path, (Object)bytes, CreateMode.EPHEMERAL);
            this.processActiveEnter();
            this.activeData = this.clientData;
            this.mutex.set(Boolean.valueOf(true));
        }
        catch (ZkNodeExistsException e) {
            bytes = (byte[])this.zkClient.readData(path, true);
            if (bytes == null) {
                this.initRunning();
            } else {
                this.activeData = (ClientRunningData)JsonUtils.unmarshalFromByte((byte[])bytes, ClientRunningData.class);
                if (this.activeData.getAddress().contains(":") && this.isMine(this.activeData.getAddress())) {
                    this.mutex.set(Boolean.valueOf(true));
                }
            }
        }
        catch (ZkNoNodeException e) {
            this.zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath((String)this.destination, (short)this.clientData.getClientId()), true);
            this.initRunning();
        }
        catch (Throwable t) {
            logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].", this.destination), t);
            this.releaseRunning();
            throw new CanalClientException("something goes wrong in initRunning method. ", t);
        }
    }

    public void waitForActive() throws InterruptedException {
        this.initRunning();
        this.mutex.get();
    }

    public boolean check() {
        String path = ZookeeperPathUtils.getDestinationClientRunning((String)this.destination, (short)this.clientData.getClientId());
        try {
            ClientRunningData eventData;
            byte[] bytes = (byte[])this.zkClient.readData(path);
            this.activeData = eventData = (ClientRunningData)JsonUtils.unmarshalFromByte((byte[])bytes, ClientRunningData.class);
            boolean result = this.isMine(this.activeData.getAddress());
            if (!result) {
                logger.warn("canal is running in [{}] , but not in [{}]", (Object)this.activeData.getAddress(), (Object)this.clientData.getAddress());
            }
            return result;
        }
        catch (ZkNoNodeException e) {
            logger.warn("canal is not run any in node");
            return false;
        }
        catch (ZkInterruptedException e) {
            logger.warn("canal check is interrupt");
            Thread.interrupted();
            return this.check();
        }
        catch (ZkException e) {
            logger.warn("canal check is failed");
            return false;
        }
    }

    public boolean releaseRunning() {
        if (this.check()) {
            String path = ZookeeperPathUtils.getDestinationClientRunning((String)this.destination, (short)this.clientData.getClientId());
            this.zkClient.delete(path);
            this.mutex.set(Boolean.valueOf(false));
            this.processActiveExit();
            return true;
        }
        return false;
    }

    private boolean isMine(String address) {
        return address.equals(this.clientData.getAddress());
    }

    private void processActiveEnter() {
        if (this.listener != null) {
            InetSocketAddress connectAddress = this.listener.processActiveEnter();
            String address = connectAddress.getAddress().getHostAddress() + ":" + connectAddress.getPort();
            this.clientData.setAddress(address);
            String path = ZookeeperPathUtils.getDestinationClientRunning((String)this.destination, (short)this.clientData.getClientId());
            byte[] bytes = JsonUtils.marshalToByte((Object)this.clientData);
            this.zkClient.writeData(path, (Object)bytes);
        }
    }

    private void processActiveExit() {
        if (this.listener != null) {
            this.listener.processActiveExit();
        }
    }

    public void setListener(ClientRunningListener listener) {
        this.listener = listener;
    }

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public void setClientData(ClientRunningData clientData) {
        this.clientData = clientData;
    }

    public void setDelayTime(int delayTime) {
        this.delayTime = delayTime;
    }

    public void setZkClient(ZkClientx zkClient) {
        this.zkClient = zkClient;
    }
}

