Docker Registry Storage

上篇文章写了 Docker Registry Auth 的流程,这里研究一下 Docker Registry Storage。 Docker 镜像仓库的支持支持多种储存类型,主要分为两大类

  • 本地储存
  • 远程储存

本地储存为 filesystem 和 inmemory,然后远程储存就主要是一些云计算厂商的储存业务了。无论那种储存都是基于 StorageDriver 这个接口的实现。

储存结构

.
└── docker
    └── registry
        └── v2
            ├── blobs
            │   └── sha256
            │       ├── 07
            │       │   └── 0766572b4bacfaee9a8eb6bae79e6f6dbcdfac0805c7c6ec8b6c2c0ef097317a
            │       ├── 9a
            │       │   └── 9a597e826a59709a4af34279f496c323d496a79e4c998ee5249a738e391192bb
            │       └── 9c
            │           └── 9ca846b27f6e92f0739af5bba5509357b52be0ce0dd02d216f4dccdacd695a8a
            └── repositories
                ├── alpine
                │   ├── _layers
                │   │   └── sha256
                │   │       ├── 0766572b4bacfaee9a8eb6bae79e6f6dbcdfac0805c7c6ec8b6c2c0ef097317a
                │   │       └── 9ca846b27f6e92f0739af5bba5509357b52be0ce0dd02d216f4dccdacd695a8a
                │   ├── _manifests
                │   │   ├── revisions
                │   │   │   └── sha256
                │   │   │       └── 9a597e826a59709a4af34279f496c323d496a79e4c998ee5249a738e391192bb
                │   │   └── tags
                │   │       └── 3.4
                │   │           ├── current
                │   │           └── index
                │   │               └── sha256
                │   │                   └── 9a597e826a59709a4af34279f496c323d496a79e4c998ee5249a738e391192bb
                │   └── _uploads
                └── library
                    └── alpine
                        ├── _layers
                        │   └── sha256
                        │       ├── 0766572b4bacfaee9a8eb6bae79e6f6dbcdfac0805c7c6ec8b6c2c0ef097317a
                        │       └── 9ca846b27f6e92f0739af5bba5509357b52be0ce0dd02d216f4dccdacd695a8a
                        ├── _manifests
                        │   ├── revisions
                        │   │   └── sha256
                        │   │       └── 9a597e826a59709a4af34279f496c323d496a79e4c998ee5249a738e391192bb
                        │   └── tags
                        │       └── 3.4
                        │           ├── current
                        │           └── index
                        │               └── sha256
                        │                   └── 9a597e826a59709a4af34279f496c323d496a79e4c998ee5249a738e391192bb
                        └── _uploads

这个也就是就是官方 registry 镜像的默认储存目录 /var/lib/registry 的文件结构。上述就是镜像 alpine:3.4 在镜像仓库中的储存结构,docker/registry/v2/repositories/library/alpine 表示镜像的 repository,也是我们俗称的镜像,然后在 docker/registry/v2/repositories/library/alpine/_manifests/tags/ 下面是镜像的所有 tag 信息,tag 目录下存的是 tag 的索引,根据索引,再去 docker/registry/v2/blob 目录下就可以一级一级往下找到对应的镜像层了。一般情况下,我们会将镜像分组在相同的 namespace 下,也就是上述的 library,所以其实在官方的 hub 上面,官方的镜像都是数据 library 这个 namespace 下面的,而每个用户就在自己的 namespace 下,也就是 project。

StorageDriver

type StorageDriver interface {  
    // Name returns the human-readable "name" of the driver, useful in error
    // messages and logging. By convention, this will just be the registration
    // name, but drivers may provide other information here.
    Name() string

    // GetContent retrieves the content stored at "path" as a []byte.
    // This should primarily be used for small objects.
    GetContent(ctx context.Context, path string) ([]byte, error)

    // PutContent stores the []byte content at a location designated by "path".
    // This should primarily be used for small objects.
    PutContent(ctx context.Context, path string, content []byte) error

    // Reader retrieves an io.ReadCloser for the content stored at "path"
    // with a given byte offset.
    // May be used to resume reading a stream by providing a nonzero offset.
    Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error)

    // Writer returns a FileWriter which will store the content written to it
    // at the location designated by "path" after the call to Commit.
    Writer(ctx context.Context, path string, append bool) (FileWriter, error)

    // Stat retrieves the FileInfo for the given path, including the current
    // size in bytes and the creation time.
    Stat(ctx context.Context, path string) (FileInfo, error)

    // List returns a list of the objects that are direct descendants of the
    //given path.
    List(ctx context.Context, path string) ([]string, error)

    // Move moves an object stored at sourcePath to destPath, removing the
    // original object.
    // Note: This may be no more efficient than a copy followed by a delete for
    // many implementations.
    Move(ctx context.Context, sourcePath string, destPath string) error

    // Delete recursively deletes all objects stored at "path" and its subpaths.
    Delete(ctx context.Context, path string) error

    // URLFor returns a URL which may be used to retrieve the content stored at
    // the given path, possibly using the given options.
    // May return an ErrUnsupportedMethod in certain StorageDriver
    // implementations.
    URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error)
}

如果要自己实现镜像的 storage,就得按照 StorageDriver

对于镜像的 push 过程,最主要的函数就是 StorageDriver.Write,这个主要是用来上传大文件的,而 StorageDriver.PutContent 就是一些小文件的上传,比如说 tag 的 index。

StorageDriver.Write 返回一个 FileWriter,这个也需要自己实现,当有文件上传时,会不停的调用 FileWriter.Write ,当文件上传完毕,会调用 FileWriter.Commit

Qiniu Storage

官方并没有提供七牛的 storage,但是因为有 StorageDriver,所以也不妨碍我们自己撸一个使用七牛作为储存的私有镜像仓库来

初始化的 driver 的过程,参照阿里云的 OSS 过程,目测阿里云的 OSS 也是参照 AWS 来的,而且阿里云的 OSS API 接口,跟 AWS 真是神似了。

StorageDriver.GetContent

func (d *driver) GetContent(ctx dockerContext.Context, path string) ([]byte, error) {  
    baseURL := kodo.MakeBaseUrl(d.domain, d.qiniuPath(path))
    policy := kodo.GetPolicy{}
    privateURL := d.client.MakePrivateUrl(baseURL, &policy)

    resp, err := http.Get(privateURL)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    if resp.StatusCode != 200 {
        return nil, storagedriver.PathNotFoundError{Path: path}
    }

    return ioutil.ReadAll(resp.Body)
}

StorageDriver.GetContent 直接使用七牛的 Golang SDK ,根据 path 生成对应的文件的下载路径,然后下载后,直接返回 []byte 就行了。

StorageDriver.Reader

func (d *driver) Reader(ctx dockerContext.Context, path string, offset int64) (io.ReadCloser, error) {

    contents, err := d.GetContent(ctx, path)
    if err != nil {
        return nil, err
    }
    return ioutil.NopCloser(bytes.NewReader(contents[offset:])), nil
}

StorageDriver.Reader 应该是返回一段 offset 偏移量的文件内容,但是七牛貌似并没有提供像阿里云一样的 range 下载的方式,于是只能将整个文件下载下来,然后再手动切分文件了,这样做,要是镜像的某一层很大,会占用很多内存。

StorageDriver.PutContent

func (d *driver) PutContent(ctx dockerContext.Context, path string, contents []byte) error {  
    putExtra := kodo.PutExtra{}
    body := bytes.NewReader(contents)
    err := d.bucket.Put(context.Background(), nil, d.qiniuPath(path), body, int64(len(contents)), &putExtra)
    return err
}

StorageDriver.PutContent 也是直接使用七牛的 SDK 上传小文件。

StorageDriver.Writer

func (d *driver) Writer(ctx dockerContext.Context, path string, append bool) (storagedriver.FileWriter, error) {  
    var w *writer
    qiniuPath := d.qiniuPath(path)
    if !append {
        w = &writer{
            key:           qiniuPath,
            driver:        d,
            buffer:        make([]byte, 0, maxChunkSize),
            uploadCtxList: make([]string, 0, 1),
        }
        d.writerPath[qiniuPath] = w
    } else {
        w = d.writerPath[qiniuPath].(*writer)
        w.closed = false

    }
    return w, nil
}

由于七牛的切片上传方式与阿里云提供的切片有点不同,七牛的需要客户端自己记录上传的片段,所以不得不在 StorageDriver 这里做手脚了,这里将 FileWriter 保存在 StorageDriver 上,以便下一次使用。

StorageDriver.Move

func (d *driver) Move(ctx dockerContext.Context, sourcePath string, destPath string) error {  
    if err := d.bucket.Move(context.Background(), d.qiniuPath(sourcePath), d.qiniuPath(destPath)); err != nil {
        return err
    }
    return nil
}

StorageDriver.Move 在这里的作用是因为 registry 在上传时,会先将文件放在 docker/registry/v2/repositories/library/alpine/_uploads 目录下面,然后等文件正式上传完毕后,在移动到对应的镜像目录下面。所以也需要 StorageDriver.Delete 删除上传的临时文件。

StorageDriver.Stat

func (d *driver) Stat(ctx dockerContext.Context, path string) (storagedriver.FileInfo, error) {  
    entries, commonPrefixes, _, err := d.bucket.List(context.Background(), d.qiniuPath(path), "/", "", 1)
    if err != nil && len(entries) == 0 {

        if err == io.EOF {
            return nil, storagedriver.PathNotFoundError{Path: path}
        }
        return nil, err
    }

    fi := storagedriver.FileInfoFields{
        Path: path,
    }
    if len(entries) == 1 {
        if entries[0].Key != d.qiniuPath(path) {
            fi.IsDir = true
        } else {
            fi.IsDir = false
            fi.Size = entries[0].Fsize
            fi.ModTime = time.Unix(0, entries[0].PutTime)
        }
    } else if len(commonPrefixes) == 1 {
        fi.IsDir = true
    } else {
        return nil, storagedriver.PathNotFoundError{Path: path}
    }
    return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}

StorageDriver.Stat 可以在上传时判断文件是否已经存在。

FileWriter.Writer

func (w *writer) Write(p []byte) (int, error) {  
    if w.closed {
        return 0, fmt.Errorf("already closed")
    } else if w.committed {
        return 0, fmt.Errorf("already committed")
    } else if w.cancelled {
        return 0, fmt.Errorf("already cancelled")
    }

    if err := w.flushBuffer(); err != nil {
        return 0, err
    }

    w.buffer = append(w.buffer, p...)
    if len(w.buffer) >= maxChunkSize {
        if err := w.flushBuffer(); err != nil {
            return 0, err
        }
    }

    w.size += int64(len(p))

    return len(p), nil
}

func (w *writer) flushBuffer() error {

    for len(w.buffer) >= maxChunkSize {
        if err := w.mkblk(bytes.NewReader(w.buffer[:maxChunkSize]), maxChunkSize); err != nil {
            return err
        }
        w.buffer = w.buffer[maxChunkSize:]
    }
    return nil
}

由于七牛切片上传每块大小最大只能为 4MB ,而 registry 每次上传的一段的大小又是不确定的,所以此处将 registry 每段上传的内容存到 buffer 中,然后当 buffer 里的内容达到 4MB,就将 buffer 中的 4MB 上传到七牛。

FileWriter.Commit

func (w *writer) Commit() error {  
    defer func() {
        w.driver.RemoveWriter(w.key)
    }()
    if w.closed {
        return fmt.Errorf("already closed")
    } else if w.committed {
        return fmt.Errorf("already committed")
    } else if w.cancelled {
        return fmt.Errorf("already cancelled")
    }

    if err := w.flushBuffer(); err != nil {
        return err
    }

    if len(w.buffer) > 0 {
        if err := w.mkblk(bytes.NewReader(w.buffer), len(w.buffer)); err != nil {
            return err
        }
    }

    if err := w.mkfile(); err != nil {
        return err
    }

    w.committed = true
    return nil
}

最终上传完毕后,在 FileWriter.Commit 需要将driver 上的FileWriter 给删掉,不然会内存泄露了,在最后 commit 时,再次确认 buffer 中的内容已经全部上传到了七牛,然后调用七牛的 mkfile 合并块成文件,想当初去七牛面试技术支持的岗位时,还问过我这个问题,当时答得并不咋地。

最后

最终需要在 cmd/registry/main.go 的文件中初始化我们自己的 driver

完整的七牛 Storage 在这里,https://gist.github.com/cloverstd/813a0df6b9eacc459dd84509e902e744