sequence

This paper mainly studies golang’s Zap Sink

Sink

[email protected] / sink. Go

type Sink interface {
	zapcore.WriteSyncer
	io.Closer
}

type WriteSyncer interface {
	io.Writer
	Sync() error
}

type Writer interface {
	Write(p []byte) (n int, err error)
}

type Closer interface {
	Close() error
}
Copy the code

The Sink interface is embedded with zapcore.WriteSyncer(Write, Sync) and IO.Closer(Close) interfaces

RegisterSink

[email protected] / sink. Go

const schemeFile = "file" var ( _sinkMutex sync.RWMutex _sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme ) func init() { resetSinkRegistry() } func resetSinkRegistry() { _sinkMutex.Lock() defer _sinkMutex.Unlock() _sinkFactories = map[string]func(*url.URL) (Sink, error){ schemeFile: newFileSink, } } func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error { _sinkMutex.Lock() defer _sinkMutex.Unlock() if scheme == "" { return errors.New("can't register a sink factory for empty string") } normalized, err := normalizeScheme(scheme) if err ! = nil { return fmt.Errorf("%q is not a valid scheme: %v", scheme, err) } if _, ok := _sinkFactories[normalized]; ok { return fmt.Errorf("sink factory already registered for scheme %q", normalized) } _sinkFactories[normalized] = factory return nil }Copy the code

The RegisterSink method registers the sink Factory of the specified scheme with _sinkFactories. The factory receives the URL. The resetSinkRegistry method defaults to registering newFileSink with scheme as file

newFileSink

[email protected] / sink. Go

func newFileSink(u *url.URL) (Sink, error) { if u.User ! = nil { return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u) } if u.Fragment ! = "" { return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u) } if u.RawQuery ! = "" { return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u) } // Error messages are better if we check hostname and port separately. if u.Port() ! = "" { return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u) } if hn := u.Hostname(); hn ! = "" && hn ! = "localhost" { return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u) } switch u.Path { case "stdout": return nopCloserSink{os.Stdout}, nil case "stderr": return nopCloserSink{os.Stderr}, nil } return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) }Copy the code

NewFileSink uses os.OpenFile to create *os.File. Since *os.File has Write, Sync, Close methods, it implements the Sink interface

newSink

[email protected] / sink. Go

func newSink(rawURL string) (Sink, error) { u, err := url.Parse(rawURL) if err ! = nil { return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err) } if u.Scheme == "" { u.Scheme = schemeFile } _sinkMutex.RLock() factory, ok := _sinkFactories[u.Scheme] _sinkMutex.RUnlock() if ! ok { return nil, &errSinkNotFound{u.Scheme} } return factory(u) }Copy the code

The newSink method parses the corresponding scheme based on rawURL, defaults to file if scheme is empty, and then finds the corresponding factory from _sinkFactories, creating sink returns

open

[email protected] / writer. Go

func Open(paths ... string) (zapcore.WriteSyncer, func(), error) { writers, close, err := open(paths) if err ! = nil { return nil, nil, err } writer := CombineWriteSyncers(writers...) return writer, close, nil } func open(paths []string) ([]zapcore.WriteSyncer, func(), error) { writers := make([]zapcore.WriteSyncer, 0, len(paths)) closers := make([]io.Closer, 0, len(paths)) close := func() { for _, c := range closers { c.Close() } } var openErr error for _, path := range paths { sink, err := newSink(path) if err ! = nil { openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err)) continue } writers = append(writers, sink) closers = append(closers, sink) } if openErr ! = nil { close() return writers, nil, openErr } return writers, close, nil }Copy the code

The zap.Open method uses newSink to create sink as zapcore.WriteSyncer

The instance

func registerSinkDemo() { zap.RegisterSink("mq", mq.NewMqSink) writer, close, Err: = zap. Open (" mq: / / 192.168.99.100:9876 / log ") if err! = nil { panic(err) } defer close() logger := zap.New(zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), writer, zap.DebugLevel)).Sugar() logger.Info("hello") } type MqWriteSyncer struct { topic string producer rocketmq.Producer ctx context.Context } func (m *MqWriteSyncer) Close() error { return m.producer.Shutdown() } func (m *MqWriteSyncer) Write(p  []byte) (n int, err error) { msg := &primitive.Message{ Topic: m.topic, Body: p, } err = m.producer.SendOneWay(m.ctx, msg) return len(p), err } func (m *MqWriteSyncer) Sync() error { return nil } func NewMqSink(url *url.URL) (zap.Sink, error) { broker := fmt.Sprintf("%s:%s", url.Hostname(), url.Port()) topic := url.Path[1:len(url.Path)] p, _ := rocketmq.NewProducer( producer.WithNameServer([]string{broker}), producer.WithRetry(2), ) err := p.Start() if err ! = nil { fmt.Printf("start producer error: %s", err.Error()) return nil, err } return &MqWriteSyncer{producer: p, ctx: context.Background(), topic: topic}, nil }Copy the code

Zap. RegisterSink is used to register a SINK factory for MQ, and zap.Open is used to create MqWriteSyncer; MqWriteSyncer implements the Write and Sync methods of Zapcore. WriteSyncer as well as the Close method of Sink

summary

The Sink interface is embedded with zapcore.WriteSyncer(Write, Sync) and IO.Closer(Close) interfaces. Zap. RegisterSink is used to register the sink factory of the specified scheme, and zap.Open will parse the URL to find the corresponding sink factory and create the corresponding sink, namely, writer.

doc

  • zap