sequence

This article focuses on Flink’s Table Formats

The instance

CSV Format

.withFormat(
  new Csv()
    .field("field1", Types.STRING)    // required: ordered format fields
    .field("field2", Types.TIMESTAMP)
    .fieldDelimiter(",")              // optional: string delimiter "," by default
    .lineDelimiter("\n")              // optional: string delimiter "\n" by default
    .quoteCharacter('"')              // optional: single character for string values, empty by default
    .commentPrefix(The '#')               // optional: string to indicate comments, empty by default
    .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
    .ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
)
Copy the code
  • Flink supports CSV format built-in without additional dependencies

JSON Format

.withFormat(
  new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // required: define the schema either by using type information whichparses numbers to corresponding types .schema(Type.ROW(...) ) // or by using a JSON schemawhich parses to DECIMAL and TIMESTAMP
    .jsonSchema(
      "{" +
      " type: 'object'," +
      " properties: {" +
      " lon: {" +
      " type: 'number'" +
      "}," +
      " rideTime: {" +
      " type: 'string'," +
      " format: 'date-time'" +
      "}" +
      "}" +
      "}"
    )

    // or use the table's schema .deriveSchema() )Copy the code
  • Json formats can be defined using Schema or jsonSchema or deriveSchema, with additional Flink-JSON dependencies

Apache Avro Format

.withFormat(
  new Avro()

    // required: define the schema either by using an Avro specific record class
    .recordClass(User.class)

    // or by using an Avro schema
    .avroSchema(
      "{" +
      " \"type\": \"record\"," +
      " \"name\": \"test\"," +
      " \"fields\" : [" +
      " {\"name\": \"a\", \"type\": \"long\"}," +
      " {\"name\": \"b\", \"type\": \"string\"}" +
      "]" +
      "}"))Copy the code
  • Avro schema can be defined using recordClass or Avro Schema, and flink-Avro dependencies need to be added

ConnectTableDescriptor

Flink – table_2. 11-1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
    private val tableEnv: TableEnvironment,
    private val connectorDescriptor: ConnectorDescriptor)
  extends TableDescriptor
  with SchematicDescriptor[D]
  with RegistrableDescriptor { this: D =>

  private var formatDescriptor: Option[FormatDescriptor] = None
  private var schemaDescriptor: Option[Schema] = None

  //......

  override def withFormat(format: FormatDescriptor): D = {
    formatDescriptor = Some(format)
    this
  }

  //......
}
Copy the code
  • StreamTableEnvironment’s connect method creates a StreamTableDescriptor; StreamTableDescriptor inherits ConnectTableDescriptor; ConnectTableDescriptor provides a withFormat method that returns the FormatDescriptor

FormatDescriptor

Flink – table – common – 1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/FormatDescriptor.java

@PublicEvolving
public abstract class FormatDescriptor extends DescriptorBase implements Descriptor {

	private String type;

	private int version;

	/**
	 * Constructs a {@link FormatDescriptor}.
	 *
	 * @param type string that identifies this format
	 * @param version property version for backwards compatibility
	 */
	public FormatDescriptor(String type, int version) {
		this.type = type;
		this.version = version;
	}

	@Override
	public final Map<String, String> toProperties() {
		final DescriptorProperties properties = new DescriptorProperties();
		properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type);
		properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);
		properties.putProperties(toFormatProperties());
		return properties.asMap();
	}

	/**
	 * Converts this descriptor into a set of format properties. Usually prefixed with
	 * {@link FormatDescriptorValidator#FORMAT}.
	 */
	protected abstract Map<String, String> toFormatProperties();
}
Copy the code
  • FormatDescriptor is an abstract class; Csv, Json, and Avro are subclasses of it

Csv

Flink – table_2. 11-1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/Csv.scala

class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {

  private var fieldDelim: Option[String] = None
  private var lineDelim: Option[String] = None
  private val schema: mutable.LinkedHashMap[String, String] =
    mutable.LinkedHashMap[String, String]()
  private var quoteCharacter: Option[Character] = None
  private var commentPrefix: Option[String] = None
  private var isIgnoreFirstLine: Option[Boolean] = None
  private var lenient: Option[Boolean] = None

  def fieldDelimiter(delim: String): Csv = {
    this.fieldDelim = Some(delim)
    this
  }

  def lineDelimiter(delim: String): Csv = {
    this.lineDelim = Some(delim)
    this
  }

  def schema(schema: TableSchema): Csv = {
    this.schema.clear()
    schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
      field(n, t)
    }
    this
  }

  def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
    field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
    this
  }

  def field(fieldName: String, fieldType: String): Csv = {
    if (schema.contains(fieldName)) {
      throw new ValidationException(s"Duplicate field name $fieldName.")
    }
    schema += (fieldName -> fieldType)
    this
  }

  def quoteCharacter(quote: Character): Csv = {
    this.quoteCharacter = Option(quote)
    this
  }

  def commentPrefix(prefix: String): Csv = {
    this.commentPrefix = Option(prefix)
    this
  }

  def ignoreFirstLine(): Csv = {
    this.isIgnoreFirstLine = Some(true)
    this
  }

  def ignoreParseErrors(): Csv = {
    this.lenient = Some(true)
    this
  }

  override protected def toFormatProperties: util.Map[String, String] = {
    val properties = new DescriptorProperties()

    fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
    lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))

    val subKeys = util.Arrays.asList(
      DescriptorProperties.TABLE_SCHEMA_NAME,
      DescriptorProperties.TABLE_SCHEMA_TYPE)

    val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava

    properties.putIndexedFixedProperties(
      FORMAT_FIELDS,
      subKeys,
      subValues)
    quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
    commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
    isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
    lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))

    properties.asMap()
  }
}
Copy the code
  • Csv provides methods such as Field, fieldDelimiter, lineDelimiter, quoteCharacter, commentPrefix, ignoreFirstLine, and ignoreParseErrors

Json

Flink – json – 1.7.1 – sources jar! /org/apache/flink/table/descriptors/Json.java

public class Json extends FormatDescriptor {

	private Boolean failOnMissingField;
	private Boolean deriveSchema;
	private String jsonSchema;
	private String schema;

	public Json() {
		super(FORMAT_TYPE_VALUE, 1);
	}

	public Json failOnMissingField(boolean failOnMissingField) {
		this.failOnMissingField = failOnMissingField;
		return this;
	}

	public Json jsonSchema(String jsonSchema) {
		Preconditions.checkNotNull(jsonSchema);
		this.jsonSchema = jsonSchema;
		this.schema = null;
		this.deriveSchema = null;
		return this;
	}

	public Json schema(TypeInformation<Row> schemaType) {
		Preconditions.checkNotNull(schemaType);
		this.schema = TypeStringUtils.writeTypeInfo(schemaType);
		this.jsonSchema = null;
		this.deriveSchema = null;
		return this;
	}

	public Json deriveSchema() {
		this.deriveSchema = true;
		this.schema = null;
		this.jsonSchema = null;
		return this;
	}

	@Override
	protected Map<String, String> toFormatProperties() {
		final DescriptorProperties properties = new DescriptorProperties();

		if(deriveSchema ! = null) { properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema); }if(jsonSchema ! = null) { properties.putString(FORMAT_JSON_SCHEMA, jsonSchema); }if(schema ! = null) { properties.putString(FORMAT_SCHEMA, schema); }if(failOnMissingField ! = null) { properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField); }returnproperties.asMap(); }}Copy the code
  • Json provides schema, jsonSchema, and deriveSchema to define THE Json format

Avro

Flink – avro – 1.7.1 – sources jar! /org/apache/flink/table/descriptors/Avro.java

public class Avro extends FormatDescriptor {

	private Class<? extends SpecificRecord> recordClass;
	private String avroSchema;

	public Avro() {
		super(AvroValidator.FORMAT_TYPE_VALUE, 1);
	}

	public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
		Preconditions.checkNotNull(recordClass);
		this.recordClass = recordClass;
		return this;
	}

	public Avro avroSchema(String avroSchema) {
		Preconditions.checkNotNull(avroSchema);
		this.avroSchema = avroSchema;
		return this;
	}

	@Override
	protected Map<String, String> toFormatProperties() {
		final DescriptorProperties properties = new DescriptorProperties();

		if(null ! = recordClass) { properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass); }if(null ! = avroSchema) { properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema); }returnproperties.asMap(); }}Copy the code
  • Avro provides two ways to define Avro format: recordClass and Avro Schema

summary

  • StreamTableEnvironment’s connect method creates a StreamTableDescriptor; StreamTableDescriptor inherits ConnectTableDescriptor
  • ConnectTableDescriptor provides a withFormat method that returns the FormatDescriptor; FormatDescriptor is an abstract class; Csv, Json, and Avro are subclasses of it
  • Csv provides methods such as Field, fieldDelimiter, lineDelimiter, quoteCharacter, commentPrefix, ignoreFirstLine, and ignoreParseErrors. Json provides schema, jsonSchema, and deriveSchema to define Json format. Avro provides two ways to define Avro format: recordClass and Avro Schema

doc

  • Table Formats