/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.state;

import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager$;
import org.apache.spark.sql.execution.streaming.state.SchemaHelper;
import org.apache.spark.sql.execution.streaming.state.SchemaHelper$SchemaReader$;
import org.apache.spark.sql.execution.streaming.state.SchemaHelper$SchemaWriter$;
import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker$;
import org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible;
import org.apache.spark.sql.execution.streaming.state.StateStoreProviderId;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Tuple2;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0005\u0005ec\u0001\u0002\r\u001a\u0001!B\u0001\"\u000e\u0001\u0003\u0002\u0003\u0006IA\u000e\u0005\tu\u0001\u0011\t\u0011)A\u0005w!)1\t\u0001C\u0001\t\"9\u0001\n\u0001b\u0001\n\u0013I\u0005B\u0002)\u0001A\u0003%!\nC\u0004R\u0001\t\u0007I\u0011\u0002*\t\r]\u0003\u0001\u0015!\u0003T\u0011\u001dA\u0006A1A\u0005\n%Ca!\u0017\u0001!\u0002\u0013Q\u0005b\u0002.\u0001\u0005\u0004%Ia\u0017\u0005\u0007i\u0002\u0001\u000b\u0011\u0002/\t\u000bU\u0004A\u0011\u0001<\t\rU\u0004A\u0011AA\u0005\u0011\u001d\tI\u0002\u0001C\u0005\u00037Aq!!\n\u0001\t\u0003\t9\u0003C\u0004\u00020\u0001!\t!!\r\t\u0011\u0005=\u0002\u0001\"\u0001 \u0003oAq!a\u0010\u0001\t\u0013\t\teB\u0004\u0002FeA\t!a\u0012\u0007\raI\u0002\u0012AA%\u0011\u0019\u0019E\u0003\"\u0001\u0002L!I\u0011Q\n\u000bC\u0002\u0013\u0005\u0011q\n\u0005\t\u0003/\"\u0002\u0015!\u0003\u0002R\ty2\u000b^1uKN\u001b\u0007.Z7b\u0007>l\u0007/\u0019;jE&d\u0017\u000e^=DQ\u0016\u001c7.\u001a:\u000b\u0005iY\u0012!B:uCR,'B\u0001\u000f\u001e\u0003%\u0019HO]3b[&twM\u0003\u0002\u001f?\u0005IQ\r_3dkRLwN\u001c\u0006\u0003A\u0005\n1a]9m\u0015\t\u00113%A\u0003ta\u0006\u00148N\u0003\u0002%K\u00051\u0011\r]1dQ\u0016T\u0011AJ\u0001\u0004_J<7\u0001A\n\u0004\u0001%z\u0003C\u0001\u0016.\u001b\u0005Y#\"\u0001\u0017\u0002\u000bM\u001c\u0017\r\\1\n\u00059Z#AB!osJ+g\r\u0005\u00021g5\t\u0011G\u0003\u00023C\u0005A\u0011N\u001c;fe:\fG.\u0003\u00025c\t9Aj\\4hS:<\u0017A\u00039s_ZLG-\u001a:JIB\u0011q\u0007O\u0007\u00023%\u0011\u0011(\u0007\u0002\u0015'R\fG/Z*u_J,\u0007K]8wS\u0012,'/\u00133\u0002\u0015!\fGm\\8q\u0007>tg\r\u0005\u0002=\u00036\tQH\u0003\u0002?\u007f\u0005!1m\u001c8g\u0015\t\u00015%\u0001\u0004iC\u0012|w\u000e]\u0005\u0003\u0005v\u0012QbQ8oM&<WO]1uS>t\u0017A\u0002\u001fj]&$h\bF\u0002F\r\u001e\u0003\"a\u000e\u0001\t\u000bU\u001a\u0001\u0019\u0001\u001c\t\u000bi\u001a\u0001\u0019A\u001e\u0002\u001fM$xN]3Da2{7-\u0019;j_:,\u0012A\u0013\t\u0003\u0017:k\u0011\u0001\u0014\u0006\u0003\u001b~\n!AZ:\n\u0005=c%\u0001\u0002)bi\"\f\u0001c\u001d;pe\u0016\u001c\u0005\u000fT8dCRLwN\u001c\u0011\u0002\u0005\u0019lW#A*\u0011\u0005Q+V\"A\u000e\n\u0005Y[\"!F\"iK\u000e\\\u0007o\\5oi\u001aKG.Z'b]\u0006<WM]\u0001\u0004M6\u0004\u0013AE:dQ\u0016l\u0017MR5mK2{7-\u0019;j_:\f1c]2iK6\fg)\u001b7f\u0019>\u001c\u0017\r^5p]\u0002\nAb]2iK6\fwK]5uKJ,\u0012\u0001\u0018\t\u0003;Ft!AX8\u000f\u0005}sgB\u00011n\u001d\t\tGN\u0004\u0002cW:\u00111M\u001b\b\u0003I&t!!\u001a5\u000e\u0003\u0019T!aZ\u0014\u0002\rq\u0012xn\u001c;?\u0013\u00051\u0013B\u0001\u0013&\u0013\t\u00113%\u0003\u0002!C%\u0011adH\u0005\u00039uI!AG\u000e\n\u0005AL\u0012\u0001D*dQ\u0016l\u0017\rS3ma\u0016\u0014\u0018B\u0001:t\u00051\u00196\r[3nC^\u0013\u0018\u000e^3s\u0015\t\u0001\u0018$A\u0007tG\",W.Y,sSR,'\u000fI\u0001\u0006G\",7m\u001b\u000b\u0005oj\f)\u0001\u0005\u0002+q&\u0011\u0011p\u000b\u0002\u0005+:LG\u000fC\u0003|\u0019\u0001\u0007A0A\u0005lKf\u001c6\r[3nCB\u0019Q0!\u0001\u000e\u0003yT!a`\u0010\u0002\u000bQL\b/Z:\n\u0007\u0005\raP\u0001\u0006TiJ,8\r\u001e+za\u0016Da!a\u0002\r\u0001\u0004a\u0018a\u0003<bYV,7k\u00195f[\u0006$ra^A\u0006\u0003\u001b\ty\u0001C\u0003|\u001b\u0001\u0007A\u0010\u0003\u0004\u0002\b5\u0001\r\u0001 \u0005\b\u0003#i\u0001\u0019AA\n\u0003EIwM\\8sKZ\u000bG.^3TG\",W.\u0019\t\u0004U\u0005U\u0011bAA\fW\t9!i\\8mK\u0006t\u0017!E:dQ\u0016l\u0017m]\"p[B\fG/\u001b2mKR1\u00111CA\u000f\u0003CAa!a\b\u000f\u0001\u0004a\u0018\u0001D:u_J,GmU2iK6\f\u0007BBA\u0012\u001d\u0001\u0007A0\u0001\u0004tG\",W.Y\u0001\u000fe\u0016\fGmU2iK6\fg)\u001b7f)\t\tI\u0003E\u0003+\u0003WaH0C\u0002\u0002.-\u0012a\u0001V;qY\u0016\u0014\u0014\u0001E2sK\u0006$XmU2iK6\fg)\u001b7f)\u00159\u00181GA\u001b\u0011\u0015Y\b\u00031\u0001}\u0011\u0019\t9\u0001\u0005a\u0001yR9q/!\u000f\u0002<\u0005u\u0002\"B>\u0012\u0001\u0004a\bBBA\u0004#\u0001\u0007A\u0010C\u0003[#\u0001\u0007A,\u0001\u0006tG\",W.\u0019$jY\u0016$2ASA\"\u0011\u0015A%\u00031\u0001K\u0003}\u0019F/\u0019;f'\u000eDW-\\1D_6\u0004\u0018\r^5cS2LG/_\"iK\u000e\\WM\u001d\t\u0003oQ\u0019\"\u0001F\u0015\u0015\u0005\u0005\u001d\u0013a\u0002,F%NKuJT\u000b\u0003\u0003#\u00022AKA*\u0013\r\t)f\u000b\u0002\u0004\u0013:$\u0018\u0001\u0003,F%NKuJ\u0014\u0011")
public class StateSchemaCompatibilityChecker
implements Logging {
    private final StateStoreProviderId providerId;
    private final Path storeCpLocation;
    private final CheckpointFileManager fm;
    private final Path schemaFileLocation;
    private final SchemaHelper.SchemaWriter schemaWriter;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static int VERSION() {
        return StateSchemaCompatibilityChecker$.MODULE$.VERSION();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Path storeCpLocation() {
        return this.storeCpLocation;
    }

    private CheckpointFileManager fm() {
        return this.fm;
    }

    private Path schemaFileLocation() {
        return this.schemaFileLocation;
    }

    private SchemaHelper.SchemaWriter schemaWriter() {
        return this.schemaWriter;
    }

    public void check(StructType keySchema, StructType valueSchema) {
        this.check(keySchema, valueSchema, false);
    }

    public void check(StructType keySchema, StructType valueSchema, boolean ignoreValueSchema) {
        if (this.fm().exists(this.schemaFileLocation())) {
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Schema file for provider " + $this.providerId + " exists. Comparing with provided schema.");
            Tuple2<StructType, StructType> tuple2 = this.readSchemaFile();
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            StructType storedKeySchema = (StructType)tuple2._1();
            StructType storedValueSchema = (StructType)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)storedKeySchema, (Object)storedValueSchema);
            StructType storedKeySchema2 = (StructType)tuple22._1();
            StructType storedValueSchema2 = (StructType)tuple22._2();
            if (storedKeySchema2.equals((Object)keySchema) && (ignoreValueSchema || storedValueSchema2.equals((Object)valueSchema))) {
                return;
            }
            if (!this.schemasCompatible(storedKeySchema2, keySchema) || !ignoreValueSchema && !this.schemasCompatible(storedValueSchema2, valueSchema)) {
                String errorMsgForKeySchema = "- Provided key schema: " + keySchema + "\n- Existing key schema: " + storedKeySchema2 + "\n";
                String errorMsgForValueSchema = !ignoreValueSchema ? "- Provided value schema: " + valueSchema + "\n- Existing value schema: " + storedValueSchema2 + "\n" : "";
                String errorMsg = "Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.\n" + errorMsgForKeySchema + errorMsgForValueSchema + "If you want to force running query without schema validation, please set " + SQLConf$.MODULE$.STATE_SCHEMA_CHECK_ENABLED().key() + " to false.\nPlease note running query with incompatible schema could cause indeterministic behavior.";
                this.logError((Function0<String>)(Function0 & Serializable)() -> errorMsg);
                throw new StateSchemaNotCompatible(errorMsg);
            }
            this.logInfo((Function0<String>)(Function0 & Serializable)() -> "Detected schema change which is compatible. Allowing to put rows.");
            return;
        }
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Schema file for provider " + $this.providerId + " doesn't exist. Creating one.");
        this.createSchemaFile(keySchema, valueSchema);
    }

    private boolean schemasCompatible(StructType storedSchema, StructType schema) {
        return DataType$.MODULE$.equalsIgnoreNameAndCompatibleNullability((DataType)schema, (DataType)storedSchema);
    }

    public Tuple2<StructType, StructType> readSchemaFile() {
        Tuple2<StructType, StructType> tuple2;
        try (FSDataInputStream inStream = this.fm().open(this.schemaFileLocation());){
            try {
                String versionStr = inStream.readUTF();
                SchemaHelper.SchemaReader schemaReader = SchemaHelper$SchemaReader$.MODULE$.createSchemaReader(versionStr);
                tuple2 = schemaReader.read(inStream);
            }
            catch (Throwable e) {
                this.logError((Function0<String>)(Function0 & Serializable)() -> "Fail to read schema file from " + this.schemaFileLocation(), e);
                throw e;
            }
        }
        return tuple2;
    }

    public void createSchemaFile(StructType keySchema, StructType valueSchema) {
        this.createSchemaFile(keySchema, valueSchema, this.schemaWriter());
    }

    public void createSchemaFile(StructType keySchema, StructType valueSchema, SchemaHelper.SchemaWriter schemaWriter) {
        CheckpointFileManager.CancellableFSDataOutputStream outStream = this.fm().createAtomic(this.schemaFileLocation(), false);
        try {
            schemaWriter.write(keySchema, valueSchema, outStream);
            outStream.close();
        }
        catch (Throwable e) {
            this.logError((Function0<String>)(Function0 & Serializable)() -> "Fail to write schema file to " + this.schemaFileLocation(), e);
            outStream.cancel();
            throw e;
        }
    }

    private Path schemaFile(Path storeCpLocation) {
        return new Path(new Path(storeCpLocation, "_metadata"), "schema");
    }

    public StateSchemaCompatibilityChecker(StateStoreProviderId providerId, Configuration hadoopConf) {
        this.providerId = providerId;
        Logging.$init$((Logging)this);
        this.storeCpLocation = providerId.storeId().storeCheckpointLocation();
        this.fm = CheckpointFileManager$.MODULE$.create(this.storeCpLocation(), hadoopConf);
        this.schemaFileLocation = this.schemaFile(this.storeCpLocation());
        this.schemaWriter = SchemaHelper$SchemaWriter$.MODULE$.createSchemaWriter(StateSchemaCompatibilityChecker$.MODULE$.VERSION());
        this.fm().mkdirs(this.schemaFileLocation().getParent());
    }
}

