A, Hadoop

1. Hadoop distributed storage

1) Hadoop architecture

Hadoop architecture consists of distributed storage HDFS, distributed computing MapReduce, and resource scheduling engine Yarn. The historical evolution of Hadoop is shown in the following figure:

2) Hdfs architecture

Most large data architecture is a master-slave architecture, HDFS is master-slave architecture Master | | work Slave or referred to as the management node

3) Secondary principle

1. What is the problem with NameNode memory? What functions are there to solve?

When the system fails, the original data is lost. HDFS editlog file editlog: in the editlog of the NameNode node, all the records of the HDFS to the client are recorded. For example, add, delete, rename, etc.

However, as the size of editlog log files increases over time, system restart takes longer time to recover logs. To avoid this problem, HDFS introduces checkpoint checkpoint mechanism. The fsimage namespace is the persistence checkpoint of HDFS original data. The metadata in the memory is saved to a file generated on disk.

If the Namenode is restarted, the fsimage file can be read into the disk, the data can be restored to a checkpoint, the log can be edited after the checkpoint, and the data can be fully restored. However, as time goes by, editLog logs will increase, and namenode restart will take a lot of events and HDFS is unavailable. To solve this problem, HDFS introduces SecondaryNamenode, which is used to merge fsimage and Editlog

Secondarynamenode process Prerequisites:

  • The two conditions for creating a checkpoint are:
    • SecondaryNameNode creates a checkpoint every hour
    • SecondaryNameNode checks every minute to see if there are 10,000 items in the editlog file since the last checkpoint, and if so, checkpoints are created.

Process:

  • SecondaryNameNode first requests the original Namenode to scroll edits, so that the new edit log enters the new file.
  • SecondaryNameNode Then read the fsimage and edits of the original NameNode through HTTP GET
  • SecondaryNameNode Reads fsimage into memory. Each operation of edits is then performed. And create a new fsimage file
  • SecondaryNameNode Sends the new fsimage file to the original NameNode via HTTP PUT
  • The old NameNode replaces the old fsimage with the new fsimage, and the system updates the fsimage file to record the checkpoint time.
  • At the end of the process, the Namenode has the latest fsimage and the smallest edits

4) Rack aware storage principle

In the equipment room, there are racks, each rack has several servers: The first block will store the first copy of the block in the HDFS directory of the machine. The second block will store the second copy of the block on a DataNode (D2) in a different Rack. The third block will find another DataNode on the same Rack as D2. Store a third copy of the block more copies, random nodes

5) Heartbeat mechanism

Working principle:

  • When NameNode is started, an IPC server is started inside it
  • After DataNode starts, it registers with NameNode, and each 3S sends a heartbeat to NameNode
  • The heartbeat returns a result with NameNode commands to DataNode, such as copying a block of data to another DataNode or deleting a block of data
  • If NameNode does not receive the heartbeat from a DataNode within 10 minutes, the DataNode is considered unavailable
  • DataNode periodically (6 hours) reports a block status report to The Namenode. The block status report contains a list of all data blocks on the DataNode

Function:

  • Returns instructions to the DataNode through the heartbeat nameNode
  • You can check whether DataNode is online
  • Using BlockReport, Namenode can know the storage status of each Datanode, such as disk utilization and block list. It has to do with load balancing
  • At the beginning of the Hadoop cluster, 99.9% of blocks did not reach the minimum number of copies, and the cluster was in safe mode, involving BlockReport

6) Hadoop read and write process:

7) Hadoop write process

Detailed process:

  • Create file:
    • HDFS client write data to the HDFS, first call DistributedFileSystem. The create () method, create a new file in HDFS
    • Clientprotocol.create ()) remotely invokes create() of NameNode (NameNodeRpcServer) to add a new file to the specified path in the HDFS directory tree
    • The creation of the new file is then recorded in the Editslog
    • After the NameNode. The create method performs, DistributedFileSysrem. The create () returns FSDataOutputStream, he is the essence of encapsulating a DFSOutputStream object.
  • Establish data flow pipeline:
    • The client calls dfsOutputStream.write () to write data
    • The DFSOutputStream call clientProtocol.addBlock () first requests an empty data block from NameNode
    • AddBlock () returns the LocatedBlock object, which contains the location information of all datanodes in the current data block.
    • Establish a data flow pipeline through location information
  • Write the current block to popeline:
    • The client writes data to the stream pipe. First, the client writes data to a chunk whose size is 512 bytes. When the chunk is full, the client calculates the checksum of chunk (4 bytes).
    • Then the chunk data itself is added with checksum to form a new chunk with checksum value (516 bytes).
    • Save to a larger structure packet packet, packet size 64KB
    • When the packet is full, it is first written to a DataQueue
    • The oAScket is taken from the queue and written to the pipeline, first to datanode1, then to datanode2, and then to datanode3
    • After a packet data is fetched, it is put into the ackQueue to wait for pipeline’s ACK feedback on packet
      • Each packet has ack acknowledgement packet, and the inverse pipeline (DN3 – “DN2 -” DN1) returns data stream
    • If the ack of packet is success, the packet will be deleted from ackQueue. Otherwise, take the packet out of the ackQueue, put it back in the dataQueue, and resend it – if there are more blocks to write after the current block has been written, then continue to call the ackBlock method
    • After the data of the last block in the file is written, an empty packet will be sent again, indicating that the block is written, and then the pipeline is closed. When all blocks are written close (), the stream is closed
    • Clientprotocol.plete () notifies Namenode that all blocks of the current file are written

Fault tolerance

  • How to recover the output stream when datanode in pipeline fails during writing
    • All packets cached by ackQueue in the data stream are readded to the dataQueue
    • Output stream call ClientProtocol. UpdateBlockForPipeline (), as the block to apply for a new time stamp, the namenode will record the new timestamp
    • Ensure that the block timestamp on the failed Datanode is inconsistent with the new timestamp recorded by the Namenode even if the failed Datanode is recovered, the block on the failed datanode is deleted, and the failed Datanode is deleted from the pipeline
    • Output stream call ClientProtocol. GetAdditionalDatanode comrade () new datanode to the namenode distribution data flow in the pipeline, and use the new timestamp establish pipeline
    • The datanode added to the pipeline is not currently storing the new block. The HDFS client uses the DataTransferProtocol to tell a datanode in the pipeline to copy the block to the new datanode
    • Pipeline reconstruction, the output stream call ClientProtocol. UpdatePipeline (), update the original data of the namenode
    • After the fault is rectified, subsequent data writing is complete

8) HDFS read process principle

The HDFS reading process is similar to the data writing process

  • The client reads the HDFS file, and invokes the open method of the DistributedFileSystem of the file system object
  • Return the FSDataInputStream object
  • When constructing DFSInputStream, call the getBlockLocations method of Namenode to obtain the list of datanodes storing the first blocks of file. The DN list for each block is sorted according to the network topology. The DN closest to the client is ranked first
  • The read method of DFSInputStream is called to read the data of block1 and connect to the nearest Datanode of the client to read the data.
  • After reading, close the stream established with dn
  • Read the data for the next block2
  • After the file data is read, the close method of FSDataInputStream is called

Tolerance:

  • Case 1: During block reading, the client communicates with the Datanode
    • The client connects to the second Datanode that stores the block and reads the data
    • Log the datanode with the problem and do not read data from it
  • Case 2: The client fails to read the block data
    • When the client reads block data, it also reads the checksum of the block data. If the client calculates the checksum for the read block data and the value is different from the read checksum, the block data is damaged
    • Client reads block data from other Datanodes that store the block copy (also computes checksums)
    • At the same time, the client informs Nodename of this situation

9) HA is highly available

HDFS high availability: For HDFS,NN stores metadata in memory and manages the namespace of the file system and the read and write requests of the client to HDFS. However, there is only one NN. Once a single point of failure occurs, the entire cluster system fails. The EXTERNAL SERVICE provided by the SNN cannot be switched immediately, that is, the HDFS service is stopped.

  • Hdfs2. X adopts HA architecture:

    • In an HA cluster, you can set two NNS, one active and one standby
    • Zookeeper works in active/standby mode
    • The NN in the active state responds to all client requests. The NN in the standby state acts as a hot backup node to ensure data synchronization with the active NN
    • When the ACTICT node becomes faulty, the ZooKeeper cluster detects the fault and the standby node immediately switches to the active state to provide external services
    • Ensure that the cluster is always available
  • How do I hot back up data

    • Standby is a hot backup of the active NN, so the status of the active NN must be synchronized to StandbyNN in real time
    • A shared storage System, such as NFS(NetworkFile System), Quorum Journal Manager (QJM), or Zookeeper, can be used for state synchronization.
    • The Active NN writes the updated data to the shared storage system, and the Standby NN listens to the system all the time. As soon as new data is written, the Standby NN reads the data from the public storage system and loads it into its own memory to ensure that the data is consistent with the Active NN
  • Piece of report

    • NN ensures the mapping between data blocks and the actual storage location. To ensure fast failover in case of a fault, StandbyNN must also contain the latest block mapping
    • Therefore, the addresses of Active and Standby NNS need to be configured for all dN, and the block position and heartbeat information should be sent to the two NNS at the same time.

10) Federation (Understand)

1. Why federation? Although HA solves the problem of single point ancient clock, HDFS still has problems in scalability, overall performance, and isolation

  • In terms of system expansion, metadata is stored in NN memory, which is limited to memory uptime (each file, directory, block occupies about 150 bytes)
  • Overall performance, throughput is affected by a single NN
  • In isolation, one program may affect the performance of other programs. If one program consumes too many resources, other programs may not run smoothly
  • Ha is essentially a single-named node

2. Federation solves the above three problems

  • In the HDFS federation, multiple namespaces are designed, and each namespace has one NN or one primary and one standby NN, so that the HDFS naming service can be horizontally extended
  • These NNS perform their own namespace namespace and block management independently of each other and do not need to coordinate with each other
  • Each DN should register with all NNS in the cluster and periodically send heartbeat information and block information to all NNS to report its own status
  • HDFS federation Each independent NN corresponds to an independent namespace
  • Each namespace manages its own set of blocks, which correspond to the concept of a block pool.
  • Each DN provides block storage for all block pools. Blocks in the block pool are actually stored in different DN

11) Compression (Understand)

  • File compression benefits:

    • Reduce disk space occupied by data
    • Speed up data IO on disk and network
  • Common compression formats

    Compressed format UNIX tools Calculate method File extension divisible
    DEFLATE There is no DEFLATE .deflate No
    gzip gzip DEFLATE .gz No
    zip zip DEFLATE .zip YES
    bzip bzip2 bzip2 .bz2 YES
    LZO lzop LZO .lzo No
    Snappy There is no Snappy .snappy No
  • Hadoop compression implementation class; All implement CompressionCodec interface

    Compressed format Corresponding encoding/decoder
    DEFLATE org.apache.hadoop.io.compress.DefaultCodec
    gzip org.apache.hadoop.io.compress.GzipCodec
    bzip2 org.apache.hadoop.io.compress.BZip2Codec
    LZO com.hadoop.compression.lzo.LzopCodec
    Snappy org.apache.hadoop.io.compress.SnappyCodec

11) Small file governance (Understanding)

Sequence Files solution:

  • SequenceFile file, mainly composed of a record; Each record is a key-value pair
  • The SequenceFile file can be used as a storage container for small files.
    • Each record holds the contents of a small file
    • Small file name as the current record key;
    • The contents of the small file as the value of the current record;
    • For example, you can write a program to put 10000 small 100KB files into a SequenceFile file.
  • A SequenceFile is divisible, so MapReduce can cut files into blocks and operate each block independently.
  • Specific structure (as shown below) :
    • A SequenceFile begins with a 4-byte header (file version number)
    • Then there are several records
    • Sync markers will be randomly inserted between records to facilitate positioning to record boundaries
  • Unlike HAR, SequenceFileSupport compression. The structure of the record depends on whether compression is enabled
    • Two types of compression are supported:
      • Do not compress NONE, as shown below
      • Compress RECORD, as shown below
      • Compress BLOCK, ① compress multiple records at a time; (2) Insert a synchronization point at the beginning of each new Block; The following figure
    • In most cases, compression in blocks is the best option
    • Because a block contains multiple records, the compression efficiency is higher by using the similarity between records
    • It is slow to transfer existing data to SequenceFile. A better option is to write data directly to a SequenceFile file instead of writing a small file first and then writing the small file to SequenceFile, leaving the small file as an intermediary.

2. MapReduce distributed computing

1) MapReduce principle

Mapreduce is a distributed computing framework based on the idea of divide and conquer. It consists of two parts: map phase (dividing into small tasks) and Reduce phase (summarizing the results of small files).

  • Map phase (word count for example)
    • Suppose Mr Input file “gone with the wind” has a block; Block1, block2, and block3
    • In Mr Programming, each block corresponds to a map task.
    • Take the first as an example:
      • Map1 reads the data of Block1, one row at a time.
      • Generates a key/value pair as an input to the map() argument, calling the map() method
      • Assuming the first line is read, the byte offset from the start of the current block of the currently read line is used as key (0) and the contents of the current line as value
      • Within the map (), will the value of the current line to press the space segmentation, which is to get words Dear | Bear | River
      • Turn words into key/value pair, the output out (Dear, 1) | (Bear, 1) | (River, 1); The final result is written to the local disk where the MAP node resides
      • After the first row is processed, the second row is processed. The map task ends when all the data in the block is processed
  • Reduce phase
    • The number of Reduce tasks is specified by a self-written program. The logic of each Reduce task is the same. The first Reduce task is used as an example.
      • After the map1 task is completed, Reduce1 connects to MAP1 through the network, and obtains the data of the partition belonging to Reduce1 in the output results of MAP1 to the Reduce1 end through the network
      • It is also linked to MAP2 and MAP3 to obtain results. Finally, the reduce1 end obtains 4 (Dear, 1) key-value pairs. Since the key is the same, they are divided into the same group. The 4 (Dear, 1) key-value pairs are converted into [Dear, Iterable(1, 1, 1,)] and passed into reduce () as two input parameters.
      • Inside reduce (), compute the total of Dear to 4 and print (Dear, 4) as a key-value pair
      • Each Reduce job outputs a file and the file is written to the HDFS

Shuffle diagramShuffle detail

2) shuffle

Shuffle is a process in which the map output serves as the Reduce input

  • The map side
    • Each map task has a corresponding ring buffer; The output is a KV pair, first written to the ring buffer (default size: 100m), when the content occupies 80% of the buffer space, a background thread will overflow the buffer data to a disk file
    • During an overflow write, the MAP task can continue writing data to the ring buffer; However, if the write speed is greater than that of the overwritten data and 100 MB of data is used up, the MAP task suspends writing data to the ring buffer and only overwrites data to the buffer until all the data in the ring buffer is overwritten to disks
    • Background thread overwrite disk process, has the following steps:
      • Partition each overwritten KV pair first; The number of each partition is determined by the number of reduce tasks of Mr Program. By default, HashPartitioner is used to calculate which partition the current kv pair belongs to, using the calculation formula :(key.hashcode () & Integer.MAX_VALUE) % numReduceTasks
      • In each partition, the key of kv pair is sorted in memory
      • If map-side local aggregation combiner is configured, combine the ordered data in each partition
      • If map output compression is enabled, overwrite data is compressed
    • As data continues to be written to the ring buffer, write overflows are triggered multiple times, and the local disk ends up generating multiple overflow files. Before mapTask completes, all overflow files are merged into one large overflow file. Combine is a partitioned, sorted output file (details: 1. When merge overwrite files, if there are at least 3 overwrite files and map-side Combine is set, the combine operation will be triggered during merge. 2. However, if there are only 2 or 1 overwrite files, the Combine operation is not triggered because it is essentially a Reduce operation and requires the JVM to be started, which incurs some overhead.)
  • Reduce end – Reduce teak will obtain its own partition data (many KV pairs) from map Task output through HTTP after each Map task is run.
    • If the map output data is small, it is stored in the REDUCE JMV memory. Otherwise, it is directly written to the Reduce disk

    • Once the memory buffer reaches the threshold (0.66 by default) or the threshold of map output (1000 by default), merge is triggered and the results are written to local disks

    • If Mr Programming specifies Combine, perform the Combine operation at merge time

    • As more files are overwritten, background threads merge them into larger, sorted files

    • Reduce Task After copying all map tasks, it merges all overflow files on disks

    • The default value is 10

    • The last batch is merged, with some data coming from memory and some from files on disk

    • Perform merge, sort, and group phases

    • Call the Reduce method once for each group of data

The Shuffle process is as follows: Maptask collects kv pairs from our map() method and puts them in the memory buffer. When the buffer reaches saturation (the default ratio is 0.8), it overflows to disk. If the map output is large, there will be multiple overflow files, which will be merged into one large overflow file. In the process of file overflow and merging, partitoner should be called to group and sort for key (the default is to mold the number of Partitoner according to the hash value of key), and then reducetask to get the corresponding result partition data from each MapTask machine according to its partition number. Reducetask will merge these files again (merge sort).

After merging into large files, the shuffle process ends and then enters the logical operation process of reducetask (extract the Group of each key-value pair from the file and call the UDF function (user-defined method))

Yarn scheduling

  • The application execution process of YARN consists of three steps
    • Application submission
      • The client submits the application to RM and requests an application instance
    • Start the app Master instance
      • Rm found a folder that could run a contain and started the app Master instance in the container
      • App master registers with RM. After registration, users can query RM to obtain their application information, and can directly interact with their appMaster in the future
      • The AppMaster sent a resource-request request to RM based on resource-request
      • When the Container is allocated, the AppMaster starts the Container by sending the Container-launch-Specification message to nm. The Container-launch-Specification information contains the materials that allow the Container to communicate with the app master
    • The App Master instance manages application execution
      • An application runs in a started Container as a task, and sends information about the running process and status to the App Master using application-specific protocols
      • During the running of the application, the client actively communicates with the App master to obtain the running status, progress updates and other information of the application
      • After the application is executed and all the work is done, the AppMaster unregisters with rm and closes, and all containers used are returned to the system

Lite version:

  • 1. The user submits the application to RM
  • 2. Rm applies for resources from the App Master and communicates with an NM to start the first Container to start the App Master
  • 3. Appmaster and RM register and communicate with each other to apply for resources for internal tasks. Once the resources are obtained, the appMaster will communicate with NM and the corresponding task has been started
  • 4. After all tasks are complete, the AppMaster logs out to RM, and the entire application is finished

mapreduce on yarn

  • Submit a task

    • Package the program into a JAR package, run the Hadoop JAR command on the client, and submit the job to the cluster for running
    • Call job submit() in job.waitForcompletion (true), which calls JobSubmitter’s submitJobInternal() method;
      • ②submitClient.getNewJobID() Applies for an Mr JobID from Resourcemanager
      • Check output directory: an error is reported if there is no output directory or if it already exists
      • Compute task sharding. An error will be reported if it cannot be calculated
      • ③ Run job-related tasks, such as job JAR packages, configuration files, input fragments. The jar package is uploaded to a directory named with the job ID in the HDFS. (The default jar package copy is 10. Jar packages can be read from these 10 copies when running job tasks, such as map and Reduce jobs.)
      • ④ Call submitApplication in RM to submit the job
    • The client queries the progress of the job every second. If the job changes, the progress report will be printed on the console
    • The job prints the relevant counter if it results
    • If the job fails, the failure cause is printed
  • Initialization job

    • When RM receives notification that submitApplication() is invoked, the request is passed to RM’s scheduler to allocate the container
    • ⑤ A RM communicates with nm and notifies NM to start the container. After nm receives the notification, it creates a container for a specific resource
    • ⑤ B then runs appMaster in the Container
    • ⑥ The AppMaster needs to accept the progress of the task and complete the report, so the AppMaster needs to create multiple bookkeeping objects to record the object
    • ⑦ Obtain the input split from HDFS
      • Each split creates a map task
      • The value of the mapReduce.job. reduces attribute (specified in jog.setNumReducetasks () during programming) is used to know how many Reduce tasks Mr Needs to create
      • Each task has a Taskid
  • Task Assignment

    • If it’s a small job, the AppMaster runs the Mr Job uberized, and the AppMaster decides to order the Mr’s tasks in the JVM
      • The reason is that if each task is running on a separate JVM, the JVM needs to be started separately, and it takes time to allocate resources. Tasks in multiple JVMS run in parallel in their respective JVMS
      • If it is more efficient to execute all the tasks sequentially in the AppMaster, the AppMaster will do so and the tasks will run as Uber tasks
      • Small tasks: Less than 10 Map tasks, only one Reduce task, and the size of Mr Input is smaller than one HDFS block
      • Before running any task, appMaster calls the setupJob() method to create an OutputCommitter that creates a final output directory for the job (typically on HDFS) and a temporary output directory for the job (such as the intermediate output directory for a Map task).
    • If the job is not running as an Uber task, appMaster will request a Container from RM for each task in the job
      • Before the Reduce process starts, all map tasks must be executed. The container applied for by all Map tasks takes precedence over the container applied for by Reduce
      • Apply for containers for Reduce only after 5% map tasks are completed
      • When applying a container for a Map task, follow data localization, and the scheduler tries to schedule the container on the node where the input shard of the Map task is located (move computing, do not move data).
      • Reduce jobs can run on any compute node in the cluster
      • By default, 1 GB memory and one virtual kernel are allocated to each Map or Reduce task. Is determined by the attribute graphs. The map. The memory. MB, graphs. Reduce. The memory. The MB, graphs. The map. The CPU. Vcores, graphs. Reduce. Reduce. The CPU. Vcores
  • Task Task execution

    • When the scheduler allocates a nm container for the current task and passes this information to the AppMaster; Appmaster communicates with NM01 to tell nM01 to start a container that occupies a specific amount of resources
    • NM01 receives the message and starts the container, which occupies the specified amount of resources
    • YarnChild is run in the container, and the YarnChild runs the current task (map, reduce)
  • Job progress and status updates

    • A job and its tasks have states (running, successfully completed, failed), the running progress of the current task, and the job counter
    • Report execution progress and status (including counters) to appMaster every 3 seconds while the task is running
    • AppMaster summarizes the reported results of all tasks currently running
    • The client visits appMaster every 1 second to obtain the latest status of job execution. If there is any change, it will be printed on the console
  • Finish your homework

    • When the appMaster receives a report that the last task completed, it sets the job status to success
    • When the client polls the appMaster for progress, the job is successfully executed and the program exits from waitForCompletion()
    • All statistics for jobs are printed on the console
    • The appMaster and the container that runs the task clean up the intermediate output
    • The job information is saved by the history server for future user query

Yarn life cycle

  • RM: Resource Manager
  • AM: Application Master
  • NM: Node Manager
  1. The Client submits applications to RM, including AM programs and commands for starting the AM.
  2. RM assigns the first container to AM and communicates with NM to start the application AM on the container.
  3. When an AM is started, it registers with RM to allow clients to obtain AM information from RM and communicate with the AM directly.
  4. The AM negotiates container resources for applications through the resource request protocol.
  5. If the container is successfully allocated, AM requires NM to start the application in the container. After the application is started, IT can communicate with AM independently.
  6. The application executes in the container and reports to the AM.
  7. During application execution, the Client communicates with the AM to obtain application status.
  8. After the application is complete, the AM deregisters and stops the application to RM to release resources.

File formats in Hadoop can be roughly divided into row-oriented and column-oriented formats:

Row-oriented: Data in the same row is stored together, i.e. continuous storage. SequenceFile MapFile, Avro Datafile. In this way, if you only need to access a small portion of the row, you also need to read the entire row into memory. Delaying serialization alleviates this problem to some extent, but the overhead of reading the entire row from disk is unavoidable. Line-oriented storage is suitable for cases where entire rows of data need to be processed simultaneously.

Column-oriented: The entire file is split into columns of data, and each column is stored together. Parquet, RCFile ORCFile. The column-oriented format makes it possible to skip unwanted columns when reading data and is suitable for only a small number of fields in a row. However, reading and writing in this format requires more memory space because rows need to be cached in memory (in order to retrieve a column from multiple rows). At the same time, it is not suitable for streaming write, because once the write fails, the current file cannot be restored, and the line-oriented data can be resynchronized to the last synchronization point when the write fails, so Flume uses the line-oriented storage format.

By default, orC storage compression is higher than parquet (the compression format can also be changed, but the parquet schema is more complex and takes up slightly more space. The same snappy compression format, ORC can achieve more than 1:3 compression ratio, Parquet is slightly lower than 1:3); Orc is generally more efficient than Parquet (Hadoop native storage format, which is more hive friendly); Parquet supports nested data formats, whereas ORC does not support nested data types (but indirectly through complex data types such as Map <k, V >, which is the “special case” in Article 2 and affects performance); Parquet supports field extensions, which orC does not support nave (but can override the read method implementation manually, which also corresponds to the “special case” in Article 2);

Apply for resources > Start appMaster > Apply for a Container to run the Task > Distribute the Task > Run the Task > End the Task > Reclaim the Container