/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window;

import java.time.ZoneId;
import java.util.TimeZone;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

public class LocalSlicingWindowAggOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData> {
    private static final long serialVersionUID = 1L;
    private static final ClockService CLOCK_SERVICE = ClockService.ofSystem();
    private final RowDataKeySelector keySelector;
    private final SliceAssigner sliceAssigner;
    private final long windowInterval;
    private final WindowBuffer.LocalFactory windowBufferFactory;
    private final ZoneId shiftTimezone;
    private final boolean useDayLightSaving;
    protected transient TimestampedCollector<RowData> collector;
    private transient long currentWatermark;
    private transient long nextTriggerWatermark;
    private transient WindowBuffer windowBuffer;

    public LocalSlicingWindowAggOperator(RowDataKeySelector keySelector, SliceAssigner sliceAssigner, WindowBuffer.LocalFactory windowBufferFactory, ZoneId shiftTimezone) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.keySelector = keySelector;
        this.sliceAssigner = sliceAssigner;
        this.windowInterval = sliceAssigner.getSliceEndInterval();
        this.windowBufferFactory = windowBufferFactory;
        this.shiftTimezone = shiftTimezone;
        this.useDayLightSaving = TimeZone.getTimeZone(shiftTimezone).useDaylightTime();
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
        this.collector.eraseTimestamp();
        this.windowBuffer = this.windowBufferFactory.create(this.getContainingTask(), this.getContainingTask().getEnvironment().getMemoryManager(), this.computeMemorySize(), this.getRuntimeContext(), this.collector, this.shiftTimezone);
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData inputRow = element.getValue();
        RowData key = (RowData)this.keySelector.getKey(inputRow);
        long sliceEnd = this.sliceAssigner.assignSliceEnd(inputRow, CLOCK_SERVICE);
        this.windowBuffer.addElement(key, sliceEnd, inputRow);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() > this.currentWatermark) {
            this.currentWatermark = mark.getTimestamp();
            if (this.currentWatermark >= this.nextTriggerWatermark) {
                this.windowBuffer.advanceProgress(this.currentWatermark);
                this.nextTriggerWatermark = TimeWindowUtil.getNextTriggerWatermark(this.currentWatermark, this.windowInterval, this.shiftTimezone, this.useDayLightSaving);
            }
        }
        super.processWatermark(mark);
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.windowBuffer.flush();
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.collector = null;
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    private long computeMemorySize() {
        Environment environment = this.getContainingTask().getEnvironment();
        return environment.getMemoryManager().computeMemorySize(this.getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, environment.getTaskManagerInfo().getConfiguration(), environment.getUserCodeClassLoader().asClassLoader()));
    }
}

