Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume Doesn't Resume from the Last Failed Step #1851

Open
EdIzaguirre opened this issue May 21, 2024 · 0 comments
Open

Resume Doesn't Resume from the Last Failed Step #1851

EdIzaguirre opened this issue May 21, 2024 · 0 comments

Comments

@EdIzaguirre
Copy link

EdIzaguirre commented May 21, 2024

Hi all,

I am trying to use metaflow resume to resume my flow from the last failed step. Unfortunately, it seems to resume at the step prior to the failed step. Why is this happening? Is this a bug? My gut told me that it is the @batch decorator, since the train_model step is the only one with the decorator. But upon removing @batch and doing local compute it still resumed from training. The train step succeeded, and the test step failed, but resume always restarts train. For reference here are the relevant steps:

@batch(memory=40000)
    @conda(libraries={'numpy': '1.23.5', 'tensorflow': '2.15'})
    @step
    def train_model(self):
        """
        Train models in parallel and store KPIs and path for downstream consumption

        """
        # from comet_ml import Experiment
        from model.lstm_model import get_model
        import numpy as np
        from tensorflow.keras.optimizers import Adam  # pylint: disable=import-error
        from tensorflow.keras.callbacks import EarlyStopping  # pylint: disable=import-error 
        from tensorflow.keras.preprocessing.text import Tokenizer  # pylint: disable=import-error
        from tensorflow.keras.preprocessing.sequence import pad_sequences  # pylint: disable=import-error

        # TODO: pick a sensible EXP name!!!
        self.COMET_EXP_NAME = 'my_lstm_recs'
        # define some hyper parameters for model
        self.hypers = {
            'EMBEDDING_DIM': 1,
            'LSTM_HIDDEN_DIM': 1,
            'MAX_LEN': 20,
            'LEARNING_RATE': 1e-3,
            'DROPOUT': 0.3
        }
        # init comet object for tracking
        #  comet_exp = Experiment(project_name=self.COMET_EXP_NAME)
        # linking task to experiment
        #  comet_exp.add_tag(current.pathspec)
        #  comet_exp.log_parameters(self.hypers)

        # get sessions for training
        train_sessions = self.session_dataset.x_train
        # convert to strings for keras tokenization
        train_sessions = [' '.join(s) for s in train_sessions]
        # init tokenizer
        tokenizer = Tokenizer(
            filters='',
            lower=False,
            split=' ',
            oov_token='<UNK>'
        )
        # fit on training data to initialize vocab
        tokenizer.fit_on_texts(train_sessions)
        VOCAB_SIZE = len(tokenizer.word_index)
        # convert sessions to tokens
        train_sessions_token = tokenizer.texts_to_sequences(train_sessions)
        # get N-1 items as seed
        x_train = [s[:-1] for s in train_sessions_token]
        # pad to MAX_LEN
        x_train = np.array(pad_sequences(x_train, maxlen=self.hypers['MAX_LEN']))
        # get last item as label;
        # TODO: Decrementing index here because 0 is reserved for masking; Find a better way around this.
        y_train = np.array([s[-1] - 1 for s in train_sessions_token])
        print("NUMBER OF SESSIONS : {}".format(x_train.shape[0]))
        print('First 3 x:', x_train[:3])
        print('First 3 y:', y_train[:3])

        # get model
        model = get_model(VOCAB_SIZE,
                          self.hypers['MAX_LEN'],
                          self.hypers['EMBEDDING_DIM'],
                          self.hypers['LSTM_HIDDEN_DIM'],
                          self.hypers['DROPOUT'])
        # compile model
        model.compile(optimizer=Adam(learning_rate=self.hypers['LEARNING_RATE']),
                      loss='sparse_categorical_crossentropy',
                      metrics=['acc'])
        model.summary()
        # fit model
        model.fit(x_train, y_train,
                  epochs=1,
                  verbose=2,
                  batch_size=32,
                  validation_split=0.2,
                  callbacks=[EarlyStopping(patience=20)])
        # comet_exp.end()
        # save model info
        self.model = {
            'model': model.to_json(),
            'model_weights': model.get_weights(),
            'tokenizer': tokenizer.to_json(),
            'model_config': {
                'vocab_size': VOCAB_SIZE,
                'max_len': self.hypers['MAX_LEN'],
                'embedding_dim': self.hypers['EMBEDDING_DIM'],
                'lstm_hidden_dim': self.hypers['LSTM_HIDDEN_DIM'],
                'dropout': self.hypers['DROPOUT']
            }
        }
        self.next(self.test_model)

@card(type='blank', id='recCard')
    # @batch(memory=40000)
    @conda(libraries={'numpy': '1.23.5', 'tensorflow': '2.15', 'matplotlib': '3.8.4', 'scikit-learn': '1.4'})
    @step
    def test_model(self):
        """
        Load the train model and use a custom RecList to test it
        and report its performance!
        """
        from model.lstm_model import LSTMRecModel
        import matplotlib.pyplot as plt
        import tensorflow as tf
        from sklearn.preprocessing import OneHotEncoder
        import numpy as np
        rec_model = LSTMRecModel(model_dict=self.model)
        y_preds = rec_model.predict(prediction_input=self.session_dataset.x_test)
        self.predictions = y_preds

        # Initialize the OneHotEncoder
        encoder = OneHotEncoder(sparse_output=False)

        y_true = self.session_dataset.y_test

        # Fit and transform y_true
        y_true = np.array(y_true)
        y_true_encoded = encoder.fit_transform(y_true.reshape(-1, 1))

        # Compute AUC-ROC using TensorFlow
        auc = tf.keras.metrics.AUC(curve='ROC')
        auc.update_state(y_true_encoded, y_preds)
        print('AUC-ROC:', auc.result().numpy())

        # Compute ROC curve points
        fpr, tpr, _ = tf.metrics.roc_curve(y_true_encoded, y_preds)

        # Plot the ROC curve
        fig = plt.figure()
        plt.figure()
        plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % auc.result().numpy())
        plt.plot([0, 1], [0, 1], 'k--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic Curve')
        plt.legend(loc="lower right")
        plt.show()

        current.card.append(Image.from_matplotlib(fig))
        self.next(self.deploy_model)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant