The reading process typically returns the CombinedScanTask through the planTasks method and then generates RowDataIterator iterators from the CombinedScanTask to access the data

RowDataIterator iterator = new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)

Called from its parent, DataIterator

currentIterator = openTaskIterator(tasks.next());

Generate iterators to read data

  @Override
  protected CloseableIterator<RowData> openTaskIterator(FileScanTask task) { Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); Map<Integer, ? > idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); FlinkDeleteFilter deletes =new FlinkDeleteFilter(task, tableSchema, projectedSchema);
    CloseableIterable<RowData> iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant));

    return iterable.iterator();
  }
Copy the code

FlinkDeleteFilter deletes = New FlinkDeleteFilter(Task, tableSchema, projectedSchema); Create FlinkDeleteFilter,

FlinkDeleteFilter constructor in the parent class of DeleteFilter

protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
    this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
    this.dataFile = task.file();

    ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
    ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
    for (DeleteFile delete : task.deletes()) {
      switch (delete.content()) {
        case POSITION_DELETES:
          posDeleteBuilder.add(delete);
          break;
        case EQUALITY_DELETES:
          eqDeleteBuilder.add(delete);
          break;
        default:
          throw new UnsupportedOperationException("Unknown delete file content: "+ delete.content()); }}this.posDeletes = posDeleteBuilder.build();
    this.eqDeletes = eqDeleteBuilder.build();
    this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
    this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
  }
Copy the code

Schema is built in fileProjection, requiredIds is first built, and ROW_POSITION is added to requiredIds if posDeletes exist. If eqDeletes exist, add corresponding equalityFieldIds to requiredIds, generate other field sets missingIds, and eventually add a ROW_POSITION field at the end,

private static Schema fileProjection(Schema tableSchema, Schema requestedSchema, List
       
         posDeletes, List
        
          eqDeletes)
        
        {
    if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
      return requestedSchema;
    }

    Set<Integer> requiredIds = Sets.newLinkedHashSet();
    if(! posDeletes.isEmpty()) { requiredIds.add(MetadataColumns.ROW_POSITION.fieldId()); }for (DeleteFile eqDelete : eqDeletes) {
      requiredIds.addAll(eqDelete.equalityFieldIds());
    }

    Set<Integer> missingIds = Sets.newLinkedHashSet(
        Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema)));

    if (missingIds.isEmpty()) {
      return requestedSchema;
    }

    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
    List<Types.NestedField> columns = Lists.newArrayList(requestedSchema.columns());
    for (int fieldId : missingIds) {
      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
        continue; // add _pos at the end} Types.NestedField field = tableSchema.asStruct().field(fieldId); Preconditions.checkArgument(field ! =null."Cannot find required field for ID %s", fieldId);

      columns.add(field);
    }

    if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
      columns.add(MetadataColumns.ROW_POSITION);
    }

    return new Schema(columns);
  }
Copy the code

CloseableIterable

iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant)); NewIterable (Task, delt.requiredSchema (), idToConstant) is an iterator for data file data reading, according to deletes and Task generation iterators. The other DELETE data is then filtered out in the invocations of delst.filter (), and the filter is expected to have applyPosDeletes first, then applyEqDeletes, which is previously analyzed in spark reading. The posDelete file is first read into set, then put into a heap, generate an iterator, and then merge and iterate to filter out the deleted POS data. Let’s look at the logic of applyEqDeletes

  public CloseableIterable<T> filter(CloseableIterable<T> records) {
    return applyEqDeletes(applyPosDeletes(records));
  }
Copy the code
  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
    // Predicate to test whether a row should be visible to user after applying equality deletions.
    Predicate<T> remainingRows = applyEqDeletes().stream()
        .map(Predicate::negate)
        .reduce(Predicate::and)
        .orElse(t -> true);

    Filter<T> remainingRowsFilter = new Filter<T>() {
      @Override
      protected boolean shouldKeep(T item) {
        returnremainingRows.test(item); }};return remainingRowsFilter.filter(records);
  }
Copy the code

You create a Predicate object, filter remainingRows, and then call its test method to filter it. The applyeqVariance () method is first called

  private List<Predicate<T>> applyEqDeletes() {
    List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
    if (eqDeletes.isEmpty()) {
      return isInDeleteSets;
    }

    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
    for (DeleteFile delete : eqDeletes) {
      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
    }

    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
      Set<Integer> ids = entry.getKey();
      Iterable<DeleteFile> deletes = entry.getValue();

      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);

      // a projection to select and reorder fields of the file schema to match the delete rows
      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);

      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
          delete -> openDeletes(delete, deleteSchema));
      StructLikeSet deleteSet = Deletes.toEqualitySet(
          // copy the delete records because they will be held in a set
          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
          deleteSchema.asStruct());

      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
      isInDeleteSets.add(isInDeleteSet);
    }

    return isInDeleteSets;
  }
Copy the code

Schema deleteSchema = typeUtil. select(requiredSchema, IDS); The schema of the fields to be read by the DELETE file is obtained, that is, the primary key. The openDeletes(delete, deleteSchema) method is formed and passed into an iterator

The openDeletes method in DeleteFilter creates an iterator that reads the Parquet file, The incoming deleteSchema as read information createReaderFunc (fileSchema – > GenericParquetReaders. BuildReader (deleteSchema, FileSchema)), an implementation class that generates ParquetValueReader from the parquet fileSchema is passed in to read the related columns

private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
  InputFile input = getInputFile(deleteFile.path().toString());
  switch (deleteFile.format()) {
    casePARQUET: Parquet.ReadBuilder builder = Parquet.read(input) .project(deleteSchema) .reuseContainers() .createReaderFunc(fileSchema  -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));if (deleteFile.content() == FileContent.POSITION_DELETES) {
        builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
      }

      returnbuilder.build(); }}Copy the code

Here, the DELETE files are first put into the map according to equalityFieldIds as key, the delt.toequalitySet is called in each map entry to generate a StructLikeSet object, and then the filter is constructed to return, This should be the main bottleneck of reading and combining small files. The construction needs to read EquilityDeleteFiles and then construct StructLikeSet. It takes a lot of time to read EquilityDeleteFiles from HDFS first. But in fact, only equalityFieldIds is needed, which wastes IO time and storage space when writing. According to the test results when reading, if the whole record is long, the time to construct StructLikeSet will also increase sharply. The parquet Column storage file is used for storage, which should have optimization functions of Column Pruning and Column Pruning and Project PushDown mapping, but the more testing feeling is not reflected. Then it constructs StructLikeSet, which only stores equalityFieldIds column, but it still consumes a lot of memory and the query efficiency is lower with a larger amount of data. Optimization can also be considered here.