/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.change;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.cache.MergeResult;
import com.alipay.sofa.registry.server.data.change.ChangeData;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventQueue;
import com.alipay.sofa.registry.server.data.change.notify.IDataChangeNotifier;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Resource;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

public class DataChangeHandler
implements InitializingBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeHandler.class);
    @Autowired
    private DataServerConfig dataServerBootstrapConfig;
    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;
    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;

    public void afterPropertiesSet() {
        this.dataChangeEventCenter.init(this.dataServerBootstrapConfig);
        this.start();
    }

    public void start() {
        DataChangeEventQueue[] queues = this.dataChangeEventCenter.getQueues();
        int queueCount = queues.length;
        Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
        Executor notifyExecutor = ExecutorFactory.newFixedThreadPool(this.dataServerBootstrapConfig.getQueueCount() * 5, this.getClass().getSimpleName());
        for (int idx = 0; idx < queueCount; ++idx) {
            DataChangeEventQueue dataChangeEventQueue = queues[idx];
            String name = dataChangeEventQueue.getName();
            LOGGER.info("[DataChangeHandler] begin to notify datum in queue:{}", (Object)name);
            executor.execute(() -> {
                while (true) {
                    try {
                        while (true) {
                            ChangeData changeData = dataChangeEventQueue.take();
                            notifyExecutor.execute(new ChangeNotifier(changeData, name));
                        }
                    }
                    catch (Throwable e) {
                        LOGGER.error("[DataChangeHandler][{}] notify scheduler error", (Object)name, (Object)e);
                        continue;
                    }
                    break;
                }
            });
            LOGGER.info("[DataChangeHandler] notify datum in queue:{} success", (Object)name);
        }
    }

    private class ChangeNotifier
    implements Runnable {
        private ChangeData changeData;
        private String name;

        public ChangeNotifier(ChangeData changeData, String name) {
            this.changeData = changeData;
            this.name = name;
        }

        @Override
        public void run() {
            Datum datum = this.changeData.getDatum();
            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            long version = datum.getVersion();
            DataSourceTypeEnum sourceType = this.changeData.getSourceType();
            DataChangeTypeEnum changeType = this.changeData.getChangeType();
            try {
                if (sourceType == DataSourceTypeEnum.CLEAN) {
                    if (DatumCache.cleanDatum(dataCenter, dataInfoId)) {
                        LOGGER.info("[DataChangeHandler][{}] clean datum, dataCenter={}, dataInfoId={}, version={},sourceType={}, changeType={}", new Object[]{this.name, dataCenter, dataInfoId, version, sourceType, changeType});
                    }
                } else {
                    Long lastVersion = null;
                    if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                        this.notifyTempPub(datum, sourceType, changeType);
                        return;
                    }
                    MergeResult mergeResult = DatumCache.putDatum(changeType, datum);
                    lastVersion = mergeResult.getLastVersion();
                    if (lastVersion != null && lastVersion == -2L) {
                        LOGGER.error("[DataChangeHandler][{}] first put unPub datum into cache error, dataCenter={}, dataInfoId={}, version={}, sourceType={},isContainsUnPub={}", new Object[]{this.name, dataCenter, dataInfoId, version, sourceType, datum.isContainsUnPub()});
                        return;
                    }
                    boolean changeFlag = mergeResult.isChangeFlag();
                    LOGGER.info("[DataChangeHandler][{}] datum handle,datum={},dataCenter={}, dataInfoId={}, version={}, lastVersion={}, sourceType={}, changeType={},changeFlag={},isContainsUnPub={}", new Object[]{this.name, datum.hashCode(), dataCenter, dataInfoId, version, lastVersion, sourceType, changeType, changeFlag, datum.isContainsUnPub()});
                    if ((lastVersion == null || version != lastVersion) && changeFlag) {
                        for (IDataChangeNotifier notifier : DataChangeHandler.this.dataChangeNotifiers) {
                            if (!notifier.getSuitableSource().contains((Object)sourceType)) continue;
                            notifier.notify(datum, lastVersion);
                        }
                    }
                }
            }
            catch (Exception e) {
                LOGGER.error("[DataChangeHandler][{}] put datum into cache error, dataCenter={}, dataInfoId={}, version={}, sourceType={},isContainsUnPub={}", new Object[]{this.name, dataCenter, dataInfoId, version, sourceType, datum.isContainsUnPub(), e});
            }
        }

        private void notifyTempPub(Datum datum, DataSourceTypeEnum sourceType, DataChangeTypeEnum changeType) {
            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            long version = datum.getVersion();
            LOGGER.info("[DataChangeHandler][{}] datum handle temp pub,datum={},dataCenter={}, dataInfoId={}, version={}, sourceType={}, changeType={},isContainsUnPub={}", new Object[]{this.name, datum.hashCode(), dataCenter, dataInfoId, version, sourceType, changeType, datum.isContainsUnPub()});
            for (IDataChangeNotifier notifier : DataChangeHandler.this.dataChangeNotifiers) {
                if (!notifier.getSuitableSource().contains((Object)sourceType)) continue;
                notifier.notify(datum, null);
            }
        }
    }
}

