Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [seatunnel-flink-starter] If checkpoint is not set, data will be lost #6986

Open
3 tasks done
zhangshenghang opened this issue Jun 14, 2024 · 1 comment · May be fixed by #7040
Open
3 tasks done

[Bug] [seatunnel-flink-starter] If checkpoint is not set, data will be lost #6986

zhangshenghang opened this issue Jun 14, 2024 · 1 comment · May be fixed by #7040
Assignees
Labels

Comments

@zhangshenghang
Copy link
Member

zhangshenghang commented Jun 14, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Described in the documentation:
image

I submit the Flink task to Yarn for execution through the following command

bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job

test.config

env {
  parallelism = 2
  job.mode = "STREAMING"
#   flink.execution.checkpointing.interval=5000
}
source {
  Kafka {
    schema = {
      fields {
        comment_num = string
        insert_time = string
        user_info = {
            username = string,
            age = string
        }
      }
    }
    topic = "test-topic"
    consumer.group	= "test-group"
    bootstrap.servers = "xxxx"
    kafka.config = {
      client.id = client_1
      max.poll.records = 500
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    }
    result_table_name = "kafka_table"
  }
}



sink {
    Elasticsearch {
        source_table_name = "kafka_table"
        hosts = ["xxxx"]
        index = "test-588"
    }
}

I did not set the flink.execution.checkpointing.interval parameter in the configuration file,
The checkpoint.intercal parameter is not set in the Flink configuration file.
At this time, writing to ElasticSearch will lose data.

Because ElasticSearch Sink uses the parameter maxBatchSize to submit in batches, it will process uncommitted data through prepareCommit

    @Override
    public Optional<ElasticsearchCommitInfo> prepareCommit() {
        bulkEsWithRetry(this.esRestClient, this.requestEsList);
        return Optional.empty();
    }

    @Override
    public void write(SeaTunnelRow element) {
        if (RowKind.UPDATE_BEFORE.equals(element.getRowKind())) {
            return;
        }

        String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
        requestEsList.add(indexRequestRow);
        if (requestEsList.size() >= maxBatchSize) {
            bulkEsWithRetry(this.esRestClient, this.requestEsList);
        }
    }

This may be because the default value of checkpoint.interval is not set in Flink STREAMING mode in the code.
image

If this is a problem, please assign me.

SeaTunnel Version

2.3.5

SeaTunnel Config

seatunnel.yaml

seatunnel:
  engine:
    history-job-expire-minutes: 1440
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 60000
      storage:
        type: hdfs
        max-retained: 3
        plugin-config:
          namespace: /tmp/seatunnel/checkpoint_snapshot
          storage.type: hdfs


### Running Command

```shell
bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job

Error Exception

When the data does not meet the maxBatchSize, the writer will not write the data. This will cause the data to not be flushed.

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Carl-Zhou-CN
Copy link
Member

That seems to be a problem

@zhangshenghang zhangshenghang linked a pull request Jun 21, 2024 that will close this issue
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants