/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec;
import org.apache.spark.sql.execution.streaming.PropagateWatermarkSimulator$;
import org.apache.spark.sql.execution.streaming.StateStoreWriter;
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo;
import org.apache.spark.sql.execution.streaming.WatermarkPropagator;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.MapOps;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashMap$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\u0005\u001db\u0001\u0002\b\u0010\u0001qAQ!\f\u0001\u0005\u00029Bq\u0001\r\u0001C\u0002\u0013%\u0011\u0007\u0003\u0004>\u0001\u0001\u0006IA\r\u0005\b}\u0001\u0011\r\u0011\"\u0003@\u0011\u0019)\u0006\u0001)A\u0005\u0001\")a\u000b\u0001C\u0005/\")Q\f\u0001C\u0005=\")A\u000f\u0001C\u0005k\")a\u0010\u0001C!\u007f\"9\u0011q\u0001\u0001\u0005\n\u0005%\u0001bBA\t\u0001\u0011\u0005\u00131\u0003\u0005\b\u00033\u0001A\u0011IA\u000e\u0011\u001d\t\t\u0003\u0001C!\u0003G\u00111\u0004\u0015:pa\u0006<\u0017\r^3XCR,'/\\1sWNKW.\u001e7bi>\u0014(B\u0001\t\u0012\u0003%\u0019HO]3b[&twM\u0003\u0002\u0013'\u0005IQ\r_3dkRLwN\u001c\u0006\u0003)U\t1a]9m\u0015\t1r#A\u0003ta\u0006\u00148N\u0003\u0002\u00193\u00051\u0011\r]1dQ\u0016T\u0011AG\u0001\u0004_J<7\u0001A\n\u0005\u0001u\u0019s\u0005\u0005\u0002\u001fC5\tqDC\u0001!\u0003\u0015\u00198-\u00197b\u0013\t\u0011sD\u0001\u0004B]f\u0014VM\u001a\t\u0003I\u0015j\u0011aD\u0005\u0003M=\u00111cV1uKJl\u0017M]6Qe>\u0004\u0018mZ1u_J\u0004\"\u0001K\u0016\u000e\u0003%R!AK\u000b\u0002\u0011%tG/\u001a:oC2L!\u0001L\u0015\u0003\u000f1{wmZ5oO\u00061A(\u001b8jiz\"\u0012a\f\t\u0003I\u0001\t!CY1uG\"LE\rV8XCR,'/\\1sWV\t!\u0007\u0005\u00034qiRT\"\u0001\u001b\u000b\u0005U2\u0014\u0001B;uS2T\u0011aN\u0001\u0005U\u00064\u0018-\u0003\u0002:i\t9AK]3f\u001b\u0006\u0004\bC\u0001\u0010<\u0013\tatD\u0001\u0003M_:<\u0017a\u00052bi\u000eD\u0017\n\u001a+p/\u0006$XM]7be.\u0004\u0013aD5oaV$x+\u0019;fe6\f'o[:\u0016\u0003\u0001\u0003B!\u0011$;\u00116\t!I\u0003\u0002D\t\u00069Q.\u001e;bE2,'BA# \u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003\u000f\n\u00131!T1q!\u0011I\u0005K\u000f*\u000f\u0005)s\u0005CA& \u001b\u0005a%BA'\u001c\u0003\u0019a$o\\8u}%\u0011qjH\u0001\u0007!J,G-\u001a4\n\u0005\u001d\u000b&BA( !\rq2KO\u0005\u0003)~\u0011aa\u00149uS>t\u0017\u0001E5oaV$x+\u0019;fe6\f'o[:!\u00035I7/\u00138ji&\fG.\u001b>fIR\u0011\u0001l\u0017\t\u0003=eK!AW\u0010\u0003\u000f\t{w\u000e\\3b]\")AL\u0002a\u0001u\u00059!-\u0019;dQ&#\u0017AE4fi&s\u0007/\u001e;XCR,'/\\1sWN$2a\u00185o!\r\u0001WM\u000f\b\u0003C\u000et!a\u00132\n\u0003\u0001J!\u0001Z\u0010\u0002\u000fA\f7m[1hK&\u0011am\u001a\u0002\u0004'\u0016\f(B\u00013 \u0011\u0015Iw\u00011\u0001k\u0003\u0011qw\u000eZ3\u0011\u0005-dW\"A\t\n\u00055\f\"!C*qCJ\\\u0007\u000b\\1o\u0011\u0015yw\u00011\u0001q\u0003Uqw\u000eZ3U_>+H\u000f];u/\u0006$XM]7be.\u0004B!\u0011$r%B\u0011aD]\u0005\u0003g~\u00111!\u00138u\u0003)!wnU5nk2\fG/\u001a\u000b\u0005mfTH\u0010\u0005\u0002\u001fo&\u0011\u0001p\b\u0002\u0005+:LG\u000fC\u0003]\u0011\u0001\u0007!\bC\u0003|\u0011\u0001\u0007!.\u0001\u0003qY\u0006t\u0007\"B?\t\u0001\u0004Q\u0014aD8sS\u001eLgnV1uKJl\u0017M]6\u0002\u0013A\u0014x\u000e]1hCR,Gc\u0002<\u0002\u0002\u0005\r\u0011Q\u0001\u0005\u00069&\u0001\rA\u000f\u0005\u0006w&\u0001\rA\u001b\u0005\u0006{&\u0001\rAO\u0001\u0012O\u0016$\u0018J\u001c9vi^\u000bG/\u001a:nCJ\\G#\u0002\u001e\u0002\f\u00055\u0001\"\u0002/\u000b\u0001\u0004Q\u0004BBA\b\u0015\u0001\u0007!(A\u0005ti\u0006$Xm\u00149JI\u0006qr-\u001a;J]B,HoV1uKJl\u0017M]6G_Jd\u0015\r^3Fm\u0016tGo\u001d\u000b\u0006u\u0005U\u0011q\u0003\u0005\u00069.\u0001\rA\u000f\u0005\u0007\u0003\u001fY\u0001\u0019\u0001\u001e\u00029\u001d,G/\u00138qkR<\u0016\r^3s[\u0006\u00148NR8s\u000bZL7\r^5p]R)!(!\b\u0002 !)A\f\u0004a\u0001u!1\u0011q\u0002\u0007A\u0002i\nQ\u0001];sO\u0016$2A^A\u0013\u0011\u0015aV\u00021\u0001;\u0001")
public class PropagateWatermarkSimulator
implements WatermarkPropagator,
Logging {
    private final TreeMap<Object, Object> batchIdToWatermark;
    private final Map<Object, scala.collection.immutable.Map<Object, Option<Object>>> inputWatermarks;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private TreeMap<Object, Object> batchIdToWatermark() {
        return this.batchIdToWatermark;
    }

    private Map<Object, scala.collection.immutable.Map<Object, Option<Object>>> inputWatermarks() {
        return this.inputWatermarks;
    }

    private boolean isInitialized(long batchId) {
        return this.batchIdToWatermark().containsKey(BoxesRunTime.boxToLong((long)batchId));
    }

    public Seq<Object> org$apache$spark$sql$execution$streaming$PropagateWatermarkSimulator$$getInputWatermarks(SparkPlan node, Map<Object, Option<Object>> nodeToOutputWatermark) {
        return (Seq)node.children().flatMap((Function1 & Serializable)child -> (Option)nodeToOutputWatermark.getOrElse((Object)BoxesRunTime.boxToInteger((int)child.id()), (Function0 & Serializable)() -> {
            throw new IllegalStateException("watermark for the node " + child.id() + " should be registered");
        }));
    }

    private void doSimulate(long batchId, SparkPlan plan, long originWatermark) {
        HashMap nodeToOutputWatermark = (HashMap)HashMap$.MODULE$.apply((Seq)Nil$.MODULE$);
        HashMap nextStatefulOperatorToWatermark = (HashMap)HashMap$.MODULE$.apply((Seq)Nil$.MODULE$);
        plan.transformUp((PartialFunction)new Serializable(this, nodeToOutputWatermark, originWatermark, nextStatefulOperatorToWatermark){
            private static final long serialVersionUID = 0L;
            private final /* synthetic */ PropagateWatermarkSimulator $outer;
            private final HashMap nodeToOutputWatermark$2;
            private final long originWatermark$2;
            private final HashMap nextStatefulOperatorToWatermark$1;

            public final <A1 extends SparkPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                A1 A1 = x1;
                if (A1 instanceof EventTimeWatermarkExec) {
                    EventTimeWatermarkExec eventTimeWatermarkExec = (EventTimeWatermarkExec)A1;
                    Seq<Object> inputWatermarks = this.$outer.org$apache$spark$sql$execution$streaming$PropagateWatermarkSimulator$$getInputWatermarks(eventTimeWatermarkExec, (Map<Object, Option<Object>>)this.nodeToOutputWatermark$2);
                    if (inputWatermarks.nonEmpty()) {
                        throw new AnalysisException("_LEGACY_ERROR_TEMP_3076", (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"config"), (Object)SQLConf$.MODULE$.STATEFUL_OPERATOR_ALLOW_MULTIPLE().key())})));
                    }
                    this.nodeToOutputWatermark$2.put((Object)BoxesRunTime.boxToInteger((int)eventTimeWatermarkExec.id()), (Object)new Some((Object)BoxesRunTime.boxToLong((long)this.originWatermark$2)));
                    return (B1)eventTimeWatermarkExec;
                }
                if (A1 instanceof StateStoreWriter) {
                    StateStoreWriter stateStoreWriter = (StateStoreWriter)((Object)A1);
                    long stOpId = ((StatefulOperatorStateInfo)stateStoreWriter.stateInfo().get()).operatorId();
                    Seq<Object> inputWatermarks = this.$outer.org$apache$spark$sql$execution$streaming$PropagateWatermarkSimulator$$getInputWatermarks((SparkPlan)((Object)stateStoreWriter), (Map<Object, Option<Object>>)this.nodeToOutputWatermark$2);
                    None$ finalInputWatermarkMs = inputWatermarks.nonEmpty() ? new Some(inputWatermarks.min((Ordering)Ordering.Long$.MODULE$)) : None$.MODULE$;
                    Option outputWatermarkMs = finalInputWatermarkMs.flatMap((Function1 & Serializable)wm -> $anonfun$doSimulate$1.$anonfun$applyOrElse$1(stateStoreWriter, BoxesRunTime.unboxToLong((Object)wm)));
                    this.nodeToOutputWatermark$2.put((Object)BoxesRunTime.boxToInteger((int)((SparkPlan)((Object)stateStoreWriter)).id()), (Object)outputWatermarkMs);
                    this.nextStatefulOperatorToWatermark$1.put((Object)BoxesRunTime.boxToLong((long)stOpId), (Object)finalInputWatermarkMs);
                    return (B1)stateStoreWriter;
                }
                Seq<Object> inputWatermarks = this.$outer.org$apache$spark$sql$execution$streaming$PropagateWatermarkSimulator$$getInputWatermarks(A1, (Map<Object, Option<Object>>)this.nodeToOutputWatermark$2);
                None$ finalInputWatermarkMs = inputWatermarks.nonEmpty() ? new Some(inputWatermarks.min((Ordering)Ordering.Long$.MODULE$)) : None$.MODULE$;
                this.nodeToOutputWatermark$2.put((Object)BoxesRunTime.boxToInteger((int)A1.id()), (Object)finalInputWatermarkMs);
                return (B1)A1;
            }

            public final boolean isDefinedAt(SparkPlan x1) {
                SparkPlan sparkPlan = x1;
                if (sparkPlan instanceof EventTimeWatermarkExec) {
                    return true;
                }
                if (sparkPlan instanceof StateStoreWriter) {
                    return true;
                }
                return true;
            }

            public static final /* synthetic */ Option $anonfun$applyOrElse$1(StateStoreWriter x3$1, long wm) {
                return x3$1.produceOutputWatermark(wm);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.nodeToOutputWatermark$2 = nodeToOutputWatermark$2;
                this.originWatermark$2 = originWatermark$2;
                this.nextStatefulOperatorToWatermark$1 = nextStatefulOperatorToWatermark$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$1$adapted(org.apache.spark.sql.execution.streaming.StateStoreWriter java.lang.Object )}, serializedLambda);
            }
        });
        this.inputWatermarks().put((Object)BoxesRunTime.boxToLong((long)batchId), (Object)nextStatefulOperatorToWatermark.toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()));
        this.batchIdToWatermark().put(BoxesRunTime.boxToLong((long)batchId), BoxesRunTime.boxToLong((long)originWatermark));
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> "global watermark for batch ID " + batchId + " is set to " + originWatermark);
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> "input watermarks for batch ID " + batchId + " is set to " + nextStatefulOperatorToWatermark);
    }

    @Override
    public void propagate(long batchId, SparkPlan plan, long originWatermark) {
        if (batchId < 0L) {
            return;
        }
        if (this.isInitialized(batchId)) {
            long cached = BoxesRunTime.unboxToLong((Object)this.batchIdToWatermark().get(BoxesRunTime.boxToLong((long)batchId)));
            Predef$.MODULE$.assert(cached == originWatermark, (Function0 & Serializable)() -> "Watermark has been changed for the same batch ID! Batch ID: " + batchId + ", Value in cache: " + cached + ", value given: " + originWatermark);
            return;
        }
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> "watermark for batch ID " + batchId + " is received as " + originWatermark + ", call site: " + Utils$.MODULE$.getCallSite(Utils$.MODULE$.getCallSite$default$1()).longForm());
        if (originWatermark == 0L) {
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> "skipping the propagation for batch " + batchId + " as origin watermark is 0.");
            this.batchIdToWatermark().put(BoxesRunTime.boxToLong((long)batchId), BoxesRunTime.boxToLong((long)0L));
            this.inputWatermarks().put((Object)BoxesRunTime.boxToLong((long)batchId), (Object)Predef$.MODULE$.Map().empty());
            return;
        }
        this.doSimulate(batchId, plan, originWatermark);
    }

    private long getInputWatermark(long batchId, long stateOpId) {
        if (batchId < 0L) {
            return 0L;
        }
        Predef$.MODULE$.assert(this.isInitialized(batchId), (Function0 & Serializable)() -> "Watermark for batch ID " + batchId + " is not yet set!");
        Option option = ((MapOps)this.inputWatermarks().apply((Object)BoxesRunTime.boxToLong((long)batchId))).get((Object)BoxesRunTime.boxToLong((long)stateOpId));
        if (option instanceof Some) {
            Some some = (Some)option;
            Option wmOpt = (Option)some.value();
            return Math.max(BoxesRunTime.unboxToLong((Object)wmOpt.getOrElse((Function0)(JFunction0.mcJ.sp & Serializable)() -> 0L)), 0L);
        }
        if (None$.MODULE$.equals(option)) {
            if (BoxesRunTime.unboxToLong((Object)this.batchIdToWatermark().get(BoxesRunTime.boxToLong((long)batchId))) == 0L) {
                return 0L;
            }
            throw new IllegalStateException("Watermark for batch ID " + batchId + " and stateOpId " + stateOpId + " is not yet set!");
        }
        throw new MatchError((Object)option);
    }

    @Override
    public long getInputWatermarkForLateEvents(long batchId, long stateOpId) {
        return this.getInputWatermark(batchId - 1L, stateOpId);
    }

    @Override
    public long getInputWatermarkForEviction(long batchId, long stateOpId) {
        return this.getInputWatermark(batchId, stateOpId);
    }

    @Override
    public void purge(long batchId) {
        Iterator<Object> keyIter = this.batchIdToWatermark().keySet().iterator();
        boolean stopIter = false;
        while (keyIter.hasNext() && !stopIter) {
            BoxedUnit boxedUnit;
            long currKey = BoxesRunTime.unboxToLong((Object)keyIter.next());
            if (currKey <= batchId) {
                keyIter.remove();
                boxedUnit = this.inputWatermarks().remove((Object)BoxesRunTime.boxToLong((long)currKey));
                continue;
            }
            stopIter = true;
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public PropagateWatermarkSimulator() {
        Logging.$init$((Logging)this);
        this.batchIdToWatermark = new TreeMap();
        this.inputWatermarks = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
    }
}

