1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Un-fork coreos/etcd - bump to v3.2.1

Commit 077f08bf54
temporarily switch etcd to a fork, pending a
pull-request to be merged, and a new release
that contains the change.

The pull request was merged, and included in
etcd v3.2.0

This patch bumps etcd to v3.2.1, which contains
some bug-fixes on top of v3.2.0

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2017-07-06 13:58:13 -07:00
parent 9d95740dbf
commit cbc480a40c
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
26 changed files with 387 additions and 215 deletions

View file

@ -42,7 +42,7 @@ github.com/vishvananda/netlink bd6d5de5ccef2d66b0a26177928d0d8895d7f969
github.com/BurntSushi/toml f706d00e3de6abe700c994cdd545a1a4915af060
github.com/samuel/go-zookeeper d0e0d8e11f318e000a8cc434616d69e329edc374
github.com/deckarep/golang-set ef32fa3046d9f249d399f98ebaf9be944430fd1d
github.com/coreos/etcd ea5389a79f40206170582c1ea076191b8622cb8e https://github.com/aaronlehmann/etcd # for https://github.com/coreos/etcd/pull/7830
github.com/coreos/etcd v3.2.1
github.com/ugorji/go f1f1a805ed361a0e078bb537e4ea78cd37dcf065
github.com/hashicorp/consul v0.5.2
github.com/boltdb/bolt fff57c100f4dea1905678da7e90d92429dff2904

View file

@ -11,7 +11,7 @@
![etcd Logo](logos/etcd-horizontal-color.png)
etcd is a distributed, consistent key-value store for shared configuration and service discovery, with a focus on being:
etcd is a distributed reliable key-value store for the most critical data of a distributed system, with a focus on being:
* *Simple*: well-defined, user-facing API (gRPC)
* *Secure*: automatic TLS with optional client cert authentication
@ -37,13 +37,11 @@ See [etcdctl][etcdctl] for a simple command line client.
### Getting etcd
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, AppC (ACI), and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
The easiest way to get etcd is to use one of the pre-built release binaries which are available for OSX, Linux, Windows, [rkt][rkt], and Docker. Instructions for using these binaries are on the [GitHub releases page][github-release].
For those wanting to try the very latest version, you can [build the latest version of etcd][dl-build] from the `master` branch.
You will first need [*Go*](https://golang.org/) installed on your machine (version 1.6+ is required).
All development occurs on `master`, including new features and bug fixes.
Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
For those wanting to try the very latest version, [build the latest version of etcd][dl-build] from the `master` branch. This first needs [*Go*](https://golang.org/) installed (version 1.8+ is required). All development occurs on `master`, including new features and bug fixes. Bug fixes are first targeted at `master` and subsequently ported to release branches, as described in the [branch management][branch-management] guide.
[rkt]: https://github.com/rkt/rkt/releases/
[github-release]: https://github.com/coreos/etcd/releases/
[branch-management]: ./Documentation/branch_management.md
[dl-build]: ./Documentation/dl_build.md#build-the-latest-version
@ -77,7 +75,7 @@ That's it! etcd is now running and serving client requests. For more
The [official etcd ports][iana-ports] are 2379 for client requests, and 2380 for peer communication.
[iana-ports]: https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?search=etcd
[iana-ports]: http://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt
### Running a local etcd cluster
@ -95,7 +93,7 @@ Every cluster member and proxy accepts key value reads and key value writes.
### Running etcd on Kubernetes
If you want to run etcd cluster on Kubernetes, try [etcd operator](https://github.com/coreos/etcd-operator).
To run an etcd cluster on Kubernetes, try [etcd operator](https://github.com/coreos/etcd-operator).
### Next steps
@ -105,7 +103,7 @@ Now it's time to dig into the full etcd API and other guides.
- Explore the full gRPC [API][api].
- Set up a [multi-machine cluster][clustering].
- Learn the [config format, env variables and flags][configuration].
- Find [language bindings and tools][libraries-and-tools].
- Find [language bindings and tools][integrations].
- Use TLS to [secure an etcd cluster][security].
- [Tune etcd][tuning].
@ -113,7 +111,7 @@ Now it's time to dig into the full etcd API and other guides.
[api]: ./Documentation/dev-guide/api_reference_v3.md
[clustering]: ./Documentation/op-guide/clustering.md
[configuration]: ./Documentation/op-guide/configuration.md
[libraries-and-tools]: ./Documentation/libraries-and-tools.md
[integrations]: ./Documentation/integrations.md
[security]: ./Documentation/op-guide/security.md
[tuning]: ./Documentation/tuning.md
@ -130,10 +128,8 @@ See [CONTRIBUTING](CONTRIBUTING.md) for details on submitting patches and the co
## Reporting bugs
See [reporting bugs](Documentation/reporting_bugs.md) for details about reporting any issue you may encounter.
See [reporting bugs](Documentation/reporting_bugs.md) for details about reporting any issues.
### License
etcd is under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.

View file

@ -15,6 +15,7 @@
package client
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
@ -27,6 +28,8 @@ import (
"sync"
"time"
"github.com/coreos/etcd/version"
"golang.org/x/net/context"
)
@ -201,6 +204,9 @@ type Client interface {
// returned
SetEndpoints(eps []string) error
// GetVersion retrieves the current etcd server and cluster version
GetVersion(ctx context.Context) (*version.Versions, error)
httpClient
}
@ -477,6 +483,33 @@ func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration
}
}
func (c *httpClusterClient) GetVersion(ctx context.Context) (*version.Versions, error) {
act := &getAction{Prefix: "/version"}
resp, body, err := c.Do(ctx, act)
if err != nil {
return nil, err
}
switch resp.StatusCode {
case http.StatusOK:
if len(body) == 0 {
return nil, ErrEmptyBody
}
var vresp version.Versions
if err := json.Unmarshal(body, &vresp); err != nil {
return nil, ErrInvalidJSON
}
return &vresp, nil
default:
var etcdErr Error
if err := json.Unmarshal(body, &etcdErr); err != nil {
return nil, ErrInvalidJSON
}
return nil, etcdErr
}
}
type roundTripResponse struct {
resp *http.Response
err error

View file

@ -14,8 +14,27 @@
package client
import (
"github.com/coreos/etcd/pkg/srv"
)
// Discoverer is an interface that wraps the Discover method.
type Discoverer interface {
// Discover looks up the etcd servers for the domain.
Discover(domain string) ([]string, error)
}
type srvDiscover struct{}
// NewSRVDiscover constructs a new Discoverer that uses the stdlib to lookup SRV records.
func NewSRVDiscover() Discoverer {
return &srvDiscover{}
}
func (d *srvDiscover) Discover(domain string) ([]string, error) {
srvs, err := srv.GetClient("etcd-client", domain)
if err != nil {
return nil, err
}
return srvs.Endpoints, nil
}

View file

@ -1,65 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"fmt"
"net"
"net/url"
)
var (
// indirection for testing
lookupSRV = net.LookupSRV
)
type srvDiscover struct{}
// NewSRVDiscover constructs a new Discoverer that uses the stdlib to lookup SRV records.
func NewSRVDiscover() Discoverer {
return &srvDiscover{}
}
// Discover looks up the etcd servers for the domain.
func (d *srvDiscover) Discover(domain string) ([]string, error) {
var urls []*url.URL
updateURLs := func(service, scheme string) error {
_, addrs, err := lookupSRV(service, "tcp", domain)
if err != nil {
return err
}
for _, srv := range addrs {
urls = append(urls, &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(srv.Target, fmt.Sprintf("%d", srv.Port)),
})
}
return nil
}
errHTTPS := updateURLs("etcd-client-ssl", "https")
errHTTP := updateURLs("etcd-client", "http")
if errHTTPS != nil && errHTTP != nil {
return nil, fmt.Errorf("dns lookup errors: %s and %s", errHTTPS, errHTTP)
}
endpoints := make([]string, len(urls))
for i := range urls {
endpoints[i] = urls[i].String()
}
return endpoints, nil
}

View file

@ -17,9 +17,10 @@ package fileutil
import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"github.com/coreos/pkg/capnslog"
@ -39,7 +40,7 @@ var (
// IsDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func IsDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
f := filepath.Join(dir, ".touch")
if err := ioutil.WriteFile(f, []byte(""), PrivateFileMode); err != nil {
return err
}
@ -101,11 +102,11 @@ func Exist(name string) bool {
// shorten the length of the file.
func ZeroToEnd(f *os.File) error {
// TODO: support FALLOC_FL_ZERO_RANGE
off, err := f.Seek(0, os.SEEK_CUR)
off, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
lenf, lerr := f.Seek(0, os.SEEK_END)
lenf, lerr := f.Seek(0, io.SeekEnd)
if lerr != nil {
return lerr
}
@ -116,6 +117,6 @@ func ZeroToEnd(f *os.File) error {
if err = Preallocate(f, lenf, true); err != nil {
return err
}
_, err = f.Seek(off, os.SEEK_SET)
_, err = f.Seek(off, io.SeekStart)
return err
}

View file

@ -17,6 +17,7 @@
package fileutil
import (
"io"
"os"
"syscall"
)
@ -36,7 +37,7 @@ const (
var (
wrlck = syscall.Flock_t{
Type: syscall.F_WRLCK,
Whence: int16(os.SEEK_SET),
Whence: int16(io.SeekStart),
Start: 0,
Len: 0,
}

View file

@ -14,7 +14,10 @@
package fileutil
import "os"
import (
"io"
"os"
)
// Preallocate tries to allocate the space for given
// file. This operation is only supported on linux by a
@ -22,6 +25,10 @@ import "os"
// If the operation is unsupported, no error will be returned.
// Otherwise, the error encountered will be returned.
func Preallocate(f *os.File, sizeInBytes int64, extendFile bool) error {
if sizeInBytes == 0 {
// fallocate will return EINVAL if length is 0; skip
return nil
}
if extendFile {
return preallocExtend(f, sizeInBytes)
}
@ -29,15 +36,15 @@ func Preallocate(f *os.File, sizeInBytes int64, extendFile bool) error {
}
func preallocExtendTrunc(f *os.File, sizeInBytes int64) error {
curOff, err := f.Seek(0, os.SEEK_CUR)
curOff, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
size, err := f.Seek(sizeInBytes, os.SEEK_END)
size, err := f.Seek(sizeInBytes, io.SeekEnd)
if err != nil {
return err
}
if _, err = f.Seek(curOff, os.SEEK_SET); err != nil {
if _, err = f.Seek(curOff, io.SeekStart); err != nil {
return err
}
if sizeInBytes > size {

View file

@ -16,7 +16,7 @@ package fileutil
import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@ -45,7 +45,7 @@ func purgeFile(dirname string, suffix string, max uint, interval time.Duration,
sort.Strings(newfnames)
fnames = newfnames
for len(newfnames) > int(max) {
f := path.Join(dirname, newfnames[0])
f := filepath.Join(dirname, newfnames[0])
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
if err != nil {
break

View file

@ -32,8 +32,8 @@ const (
// a node member ID.
//
// The initial id is in this format:
// High order byte is memberID, next 5 bytes are from timestamp,
// and low order 2 bytes are 0s.
// High order 2 bytes are from memberID, next 5 bytes are from timestamp,
// and low order one byte is a counter.
// | prefix | suffix |
// | 2 bytes | 5 bytes | 1 byte |
// | memberID | timestamp | cnt |

140
vendor/github.com/coreos/etcd/pkg/srv/srv.go generated vendored Normal file
View file

@ -0,0 +1,140 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package srv looks up DNS SRV records.
package srv
import (
"fmt"
"net"
"net/url"
"strings"
"github.com/coreos/etcd/pkg/types"
)
var (
// indirection for testing
lookupSRV = net.LookupSRV // net.DefaultResolver.LookupSRV when ctxs don't conflict
resolveTCPAddr = net.ResolveTCPAddr
)
// GetCluster gets the cluster information via DNS discovery.
// Also sees each entry as a separate instance.
func GetCluster(service, name, dns string, apurls types.URLs) ([]string, error) {
tempName := int(0)
tcp2ap := make(map[string]url.URL)
// First, resolve the apurls
for _, url := range apurls {
tcpAddr, err := resolveTCPAddr("tcp", url.Host)
if err != nil {
return nil, err
}
tcp2ap[tcpAddr.String()] = url
}
stringParts := []string{}
updateNodeMap := func(service, scheme string) error {
_, addrs, err := lookupSRV(service, "tcp", dns)
if err != nil {
return err
}
for _, srv := range addrs {
port := fmt.Sprintf("%d", srv.Port)
host := net.JoinHostPort(srv.Target, port)
tcpAddr, terr := resolveTCPAddr("tcp", host)
if terr != nil {
err = terr
continue
}
n := ""
url, ok := tcp2ap[tcpAddr.String()]
if ok {
n = name
}
if n == "" {
n = fmt.Sprintf("%d", tempName)
tempName++
}
// SRV records have a trailing dot but URL shouldn't.
shortHost := strings.TrimSuffix(srv.Target, ".")
urlHost := net.JoinHostPort(shortHost, port)
stringParts = append(stringParts, fmt.Sprintf("%s=%s://%s", n, scheme, urlHost))
if ok && url.Scheme != scheme {
err = fmt.Errorf("bootstrap at %s from DNS for %s has scheme mismatch with expected peer %s", scheme+"://"+urlHost, service, url.String())
}
}
if len(stringParts) == 0 {
return err
}
return nil
}
failCount := 0
err := updateNodeMap(service+"-ssl", "https")
srvErr := make([]string, 2)
if err != nil {
srvErr[0] = fmt.Sprintf("error querying DNS SRV records for _%s-ssl %s", service, err)
failCount++
}
err = updateNodeMap(service, "http")
if err != nil {
srvErr[1] = fmt.Sprintf("error querying DNS SRV records for _%s %s", service, err)
failCount++
}
if failCount == 2 {
return nil, fmt.Errorf("srv: too many errors querying DNS SRV records (%q, %q)", srvErr[0], srvErr[1])
}
return stringParts, nil
}
type SRVClients struct {
Endpoints []string
SRVs []*net.SRV
}
// GetClient looks up the client endpoints for a service and domain.
func GetClient(service, domain string) (*SRVClients, error) {
var urls []*url.URL
var srvs []*net.SRV
updateURLs := func(service, scheme string) error {
_, addrs, err := lookupSRV(service, "tcp", domain)
if err != nil {
return err
}
for _, srv := range addrs {
urls = append(urls, &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(srv.Target, fmt.Sprintf("%d", srv.Port)),
})
}
srvs = append(srvs, addrs...)
return nil
}
errHTTPS := updateURLs(service+"-ssl", "https")
errHTTP := updateURLs(service, "http")
if errHTTPS != nil && errHTTP != nil {
return nil, fmt.Errorf("dns lookup errors: %s and %s", errHTTPS, errHTTP)
}
endpoints := make([]string, len(urls))
for i := range urls {
endpoints[i] = urls[i].String()
}
return &SRVClients{Endpoints: endpoints, SRVs: srvs}, nil
}

View file

@ -13,9 +13,7 @@ To keep the codebase small as well as provide flexibility, the library only impl
In order to easily test the Raft library, its behavior should be deterministic. To achieve this determinism, the library models Raft as a state machine. The state machine takes a `Message` as input. A message can either be a local timer update or a network message sent from a remote peer. The state machine's output is a 3-tuple `{[]Messages, []LogEntries, NextState}` consisting of an array of `Messages`, `log entries`, and `Raft state changes`. For state machines with the same state, the same state machine input should always generate the same state machine output.
A simple example application, _raftexample_, is also available to help illustrate
how to use this package in practice:
https://github.com/coreos/etcd/tree/master/contrib/raftexample
A simple example application, _raftexample_, is also available to help illustrate how to use this package in practice: https://github.com/coreos/etcd/tree/master/contrib/raftexample
# Features
@ -51,11 +49,11 @@ This raft implementation also includes a few optional enhancements:
- [etcd](https://github.com/coreos/etcd) A distributed reliable key-value store
- [tikv](https://github.com/pingcap/tikv) A Distributed transactional key value database powered by Rust and Raft
- [swarmkit](https://github.com/docker/swarmkit) A toolkit for orchestrating distributed systems at any scale.
- [chain core](https://github.com/chain/chain) Software for operating permissioned, multi-asset blockchain networks
## Usage
The primary object in raft is a Node. You either start a Node from scratch
using raft.StartNode or start a Node from some initial state using raft.RestartNode.
The primary object in raft is a Node. Either start a Node from scratch using raft.StartNode or start a Node from some initial state using raft.RestartNode.
To start a three-node cluster
```go
@ -73,7 +71,7 @@ To start a three-node cluster
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
```
You can start a single node cluster, like so:
Start a single node cluster, like so:
```go
// Create storage and config as shown above.
// Set peer list to itself, so this node can become the leader of this single-node cluster.
@ -81,7 +79,7 @@ You can start a single node cluster, like so:
n := raft.StartNode(c, peers)
```
To allow a new node to join this cluster, do not pass in any peers. First, you need add the node to the existing cluster by calling `ProposeConfChange` on any existing node inside the cluster. Then, you can start the node with empty peer list, like so:
To allow a new node to join this cluster, do not pass in any peers. First, add the node to the existing cluster by calling `ProposeConfChange` on any existing node inside the cluster. Then, start the node with an empty peer list, like so:
```go
// Create storage and config as shown above.
n := raft.StartNode(c, nil)
@ -110,46 +108,21 @@ To restart a node from previous state:
n := raft.RestartNode(c)
```
Now that you are holding onto a Node you have a few responsibilities:
After creating a Node, the user has a few responsibilities:
First, you must read from the Node.Ready() channel and process the updates
it contains. These steps may be performed in parallel, except as noted in step
2.
First, read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2.
1. Write HardState, Entries, and Snapshot to persistent storage if they are
not empty. Note that when writing an Entry with Index i, any
previously-persisted entries with Index >= i must be discarded.
1. Write HardState, Entries, and Snapshot to persistent storage if they are not empty. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded.
2. Send all Messages to the nodes named in the To field. It is important that
no messages be sent until the latest HardState has been persisted to disk,
and all Entries written by any previous Ready batch (Messages may be sent while
entries from the same batch are being persisted). To reduce the I/O latency, an
optimization can be applied to make leader write to disk in parallel with its
followers (as explained at section 10.2.1 in Raft thesis). If any Message has type
MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be
large). Note: Marshalling messages is not thread-safe; it is important that you
make sure that no new entries are persisted while marshalling.
The easiest way to achieve this is to serialise the messages directly inside
your main raft loop.
2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). To reduce the I/O latency, an optimization can be applied to make leader write to disk in parallel with its followers (as explained at section 10.2.1 in Raft thesis). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large). Note: Marshalling messages is not thread-safe; it is important to make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialise the messages directly inside the main raft loop.
3. Apply Snapshot (if any) and CommittedEntries to the state machine.
If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange()
to apply it to the node. The configuration change may be cancelled at this point
by setting the NodeID field to zero before calling ApplyConfChange
(but ApplyConfChange must be called one way or the other, and the decision to cancel
must be based solely on the state machine and not external information such as
the observed health of the node).
3. Apply Snapshot (if any) and CommittedEntries to the state machine. If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() to apply it to the node. The configuration change may be cancelled at this point by setting the NodeID field to zero before calling ApplyConfChange (but ApplyConfChange must be called one way or the other, and the decision to cancel must be based solely on the state machine and not external information such as the observed health of the node).
4. Call Node.Advance() to signal readiness for the next batch of updates.
This may be done at any time after step 1, although all updates must be processed
in the order they were returned by Ready.
4. Call Node.Advance() to signal readiness for the next batch of updates. This may be done at any time after step 1, although all updates must be processed in the order they were returned by Ready.
Second, all persisted log entries must be made available via an
implementation of the Storage interface. The provided MemoryStorage
type can be used for this (if you repopulate its state upon a
restart), or you can supply your own disk-backed implementation.
Second, all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this (if repopulating its state upon a restart), or a custom disk-backed implementation can be supplied.
Third, when you receive a message from another node, pass it to Node.Step:
Third, after receiving a message from another node, pass it to Node.Step:
```go
func recvRaftRPC(ctx context.Context, m raftpb.Message) {
@ -157,10 +130,7 @@ Third, when you receive a message from another node, pass it to Node.Step:
}
```
Finally, you need to call `Node.Tick()` at regular intervals (probably
via a `time.Ticker`). Raft has two important timeouts: heartbeat and the
election timeout. However, internally to the raft package time is
represented by an abstract "tick".
Finally, call `Node.Tick()` at regular intervals (probably via a `time.Ticker`). Raft has two important timeouts: heartbeat and the election timeout. However, internally to the raft package time is represented by an abstract "tick".
The total state machine handling loop will look something like this:
@ -190,16 +160,13 @@ The total state machine handling loop will look something like this:
}
```
To propose changes to the state machine from your node take your application
data, serialize it into a byte slice and call:
To propose changes to the state machine from the node to take application data, serialize it into a byte slice and call:
```go
n.Propose(ctx, data)
```
If the proposal is committed, data will appear in committed entries with type
raftpb.EntryNormal. There is no guarantee that a proposed command will be
committed; you may have to re-propose after a timeout.
If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; the command may have to be reproposed after a timeout.
To add or remove node in a cluster, build ConfChange struct 'cc' and call:
@ -207,8 +174,7 @@ To add or remove node in a cluster, build ConfChange struct 'cc' and call:
n.ProposeConfChange(ctx, cc)
```
After config change is committed, some committed entry with type
raftpb.EntryConfChange will be returned. You must apply it to node through:
After config change is committed, some committed entry with type raftpb.EntryConfChange will be returned. This must be applied to node through:
```go
var cc raftpb.ConfChange
@ -223,25 +189,8 @@ may be reused. Node IDs must be non-zero.
## Implementation notes
This implementation is up to date with the final Raft thesis
(https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our
implementation of the membership change protocol differs somewhat from
that described in chapter 4. The key invariant that membership changes
happen one node at a time is preserved, but in our implementation the
membership change takes effect when its entry is applied, not when it
is added to the log (so the entry is committed under the old
membership instead of the new). This is equivalent in terms of safety,
since the old and new configurations are guaranteed to overlap.
This implementation is up to date with the final Raft thesis (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although this implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.
To ensure that we do not attempt to commit two membership changes at
once by matching log positions (which would be unsafe since they
should have different quorum requirements), we simply disallow any
proposed membership change while any uncommitted change appears in
the leader's log.
To ensure there is no attempt to commit two membership changes at once by matching log positions (which would be unsafe since they should have different quorum requirements), any proposed membership change is simply disallowed while any uncommitted change appears in the leader's log.
This approach introduces a problem when you try to remove a member
from a two-member cluster: If one of the members dies before the
other one receives the commit of the confchange entry, then the member
cannot be removed any more since the cluster cannot make progress.
For this reason it is highly recommended to use three or more nodes in
every cluster.
This approach introduces a problem when removing a member from a two-member cluster: If one of the members dies before the other one receives the commit of the confchange entry, then the member cannot be removed any more since the cluster cannot make progress. For this reason it is highly recommended to use three or more nodes in every cluster.

View file

@ -85,6 +85,26 @@ func (u *unstable) stableTo(i, t uint64) {
if gt == t && i >= u.offset {
u.entries = u.entries[i+1-u.offset:]
u.offset = i + 1
u.shrinkEntriesArray()
}
}
// shrinkEntriesArray discards the underlying array used by the entries slice
// if most of it isn't being used. This avoids holding references to a bunch of
// potentially large entries that aren't needed anymore. Simply clearing the
// entries wouldn't be safe because clients might still be using them.
func (u *unstable) shrinkEntriesArray() {
// We replace the array if we're using less than half of the space in
// it. This number is fairly arbitrary, chosen as an attempt to balance
// memory usage vs number of allocations. It could probably be improved
// with some focused tuning.
const lenMultiple = 2
if len(u.entries) == 0 {
u.entries = nil
} else if len(u.entries)*lenMultiple < cap(u.entries) {
newEntries := make([]pb.Entry, len(u.entries))
copy(newEntries, u.entries)
u.entries = newEntries
}
}

View file

@ -83,6 +83,10 @@ type Ready struct {
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
func isHardStateEqual(a, b pb.HardState) bool {
@ -517,5 +521,17 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries))
return rd
}
// MustSync returns true if the hard state and count of Raft entries indicate
// that a synchronous write to persistent storage is required.
func MustSync(st, prevst pb.HardState, entsnum int) bool {
// Persistent state on all servers:
// (Updated on stable storage before responding to RPCs)
// currentTerm
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

View file

@ -823,6 +823,11 @@ func stepLeader(r *raft, m pb.Message) {
return
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return
}
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.

View file

@ -1847,7 +1847,7 @@ func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) }
var fileDescriptorRaft = []byte{
// 790 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0xdb, 0x46,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0xdb, 0x46,
0x10, 0x16, 0x29, 0xea, 0x6f, 0x28, 0xcb, 0xab, 0xb5, 0x5a, 0x2c, 0x0c, 0x43, 0x55, 0x85, 0x1e,
0x04, 0x17, 0x76, 0x5b, 0x1d, 0x7a, 0xe8, 0xcd, 0x96, 0x0a, 0x58, 0x40, 0x65, 0xb8, 0xb2, 0xdc,
0x43, 0x83, 0x20, 0x58, 0x8b, 0x2b, 0x4a, 0x89, 0xc9, 0x25, 0x96, 0x2b, 0xc7, 0xbe, 0x04, 0x79,

View file

@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Context))
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}

View file

@ -15,15 +15,18 @@
package snap
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
@ -41,7 +44,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
os.Remove(f.Name())
return n, err
}
fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
fn := s.dbFilePath(id)
if fileutil.Exist(fn) {
os.Remove(f.Name())
return n, nil
@ -60,15 +63,15 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
// DBFilePath returns the file path for the snapshot of the database with
// given id. If the snapshot does not exist, it returns error.
func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
fns, err := fileutil.ReadDir(s.dir)
if err != nil {
if _, err := fileutil.ReadDir(s.dir); err != nil {
return "", err
}
wfn := fmt.Sprintf("%016x.snap.db", id)
for _, fn := range fns {
if fn == wfn {
return path.Join(s.dir, fn), nil
if fn := s.dbFilePath(id); fileutil.Exist(fn) {
return fn, nil
}
return "", ErrNoDBSnapshot
}
return "", fmt.Errorf("snap: snapshot file doesn't exist")
func (s *Snapshotter) dbFilePath(id uint64) string {
return filepath.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
}

View file

@ -342,7 +342,7 @@ func init() { proto.RegisterFile("snap.proto", fileDescriptorSnap) }
var fileDescriptorSnap = []byte{
// 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0xce, 0x4b, 0x2c,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0xce, 0x4b, 0x2c,
0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0xb1, 0x0b, 0x92, 0xa4, 0x44, 0xd2, 0xf3,
0xd3, 0xf3, 0xc1, 0x42, 0xfa, 0x20, 0x16, 0x44, 0x56, 0xc9, 0x8c, 0x8b, 0x03, 0x24, 0x5f, 0x9c,
0x91, 0x5f, 0x22, 0x24, 0xc6, 0xc5, 0x9c, 0x5c, 0x94, 0x2c, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0xeb,

View file

@ -21,7 +21,7 @@ import (
"hash/crc32"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
@ -84,13 +84,13 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
err = pioutil.WriteAndSyncFile(path.Join(s.dir, fname), d, 0666)
err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
if err == nil {
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
} else {
err1 := os.Remove(path.Join(s.dir, fname))
err1 := os.Remove(filepath.Join(s.dir, fname))
if err1 != nil {
plog.Errorf("failed to remove broken snapshot file %s", path.Join(s.dir, fname))
plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname))
}
}
return err
@ -114,7 +114,7 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
}
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
fpath := path.Join(dir, name)
fpath := filepath.Join(dir, name)
snap, err := Read(fpath)
if err != nil {
renameBroken(fpath)

56
vendor/github.com/coreos/etcd/version/version.go generated vendored Normal file
View file

@ -0,0 +1,56 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package version implements etcd version parsing and contains latest version
// information.
package version
import (
"fmt"
"strings"
"github.com/coreos/go-semver/semver"
)
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.2.1"
APIVersion = "unknown"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"
)
func init() {
ver, err := semver.NewVersion(Version)
if err == nil {
APIVersion = fmt.Sprintf("%d.%d", ver.Major, ver.Minor)
}
}
type Versions struct {
Server string `json:"etcdserver"`
Cluster string `json:"etcdcluster"`
// TODO: raft state machine version
}
// Cluster only keeps the major.minor.
func Cluster(v string) string {
vs := strings.Split(v, ".")
if len(vs) <= 2 {
return v
}
return fmt.Sprintf("%s.%s", vs[0], vs[1])
}

View file

@ -52,7 +52,7 @@ func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
// newFileEncoder creates a new encoder with current file offset for the page writer.
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
offset, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return nil, err
}

View file

@ -17,7 +17,7 @@ package wal
import (
"fmt"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
)
@ -65,7 +65,7 @@ func (fp *filePipeline) Close() error {
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
// count % 2 so this file isn't the same as the one last published
fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return nil, err
}

View file

@ -17,7 +17,7 @@ package wal
import (
"io"
"os"
"path"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/wal/walpb"
@ -62,7 +62,7 @@ func Repair(dirpath string) bool {
}
defer bf.Close()
if _, err = f.Seek(0, os.SEEK_SET); err != nil {
if _, err = f.Seek(0, io.SeekStart); err != nil {
plog.Errorf("could not repair %v, failed to read file", f.Name())
return false
}
@ -94,6 +94,6 @@ func openLast(dirpath string) (*fileutil.LockedFile, error) {
if err != nil {
return nil, err
}
last := path.Join(dirpath, names[len(names)-1])
last := filepath.Join(dirpath, names[len(names)-1])
return fileutil.LockFile(last, os.O_RDWR, fileutil.PrivateFileMode)
}

View file

@ -21,7 +21,7 @@ import (
"hash/crc32"
"io"
"os"
"path"
"path/filepath"
"sync"
"time"
@ -97,7 +97,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := path.Clean(dirpath) + ".tmp"
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
if fileutil.Exist(tmpdirpath) {
if err := os.RemoveAll(tmpdirpath); err != nil {
return nil, err
@ -107,12 +107,12 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
return nil, err
}
p := path.Join(tmpdirpath, walName(0, 0))
p := filepath.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
}
if _, err = f.Seek(0, os.SEEK_END); err != nil {
if _, err = f.Seek(0, io.SeekEnd); err != nil {
return nil, err
}
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
@ -143,7 +143,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
}
// directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
if perr != nil {
return nil, perr
}
@ -196,7 +196,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := path.Join(dirpath, name)
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
@ -232,7 +232,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
@ -322,7 +322,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
// not all, will cause CRC errors on WAL open. Since the records
// were never fully synced to disk in the first place, it's safe
// to zero them out to avoid any CRC errors from new writes.
if _, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET); err != nil {
if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
return nil, state, nil, err
}
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
@ -361,7 +361,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
// Then cut atomically rename temp wal file to a wal file.
func (w *WAL) cut() error {
// close old wal file; truncate to avoid wasting space if an early cut
off, serr := w.tail().Seek(0, os.SEEK_CUR)
off, serr := w.tail().Seek(0, io.SeekCurrent)
if serr != nil {
return serr
}
@ -372,7 +372,7 @@ func (w *WAL) cut() error {
return err
}
fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
// create a temp wal file with name sequence + 1, or truncate the existing one
newTail, err := w.fp.Open()
@ -401,7 +401,7 @@ func (w *WAL) cut() error {
return err
}
off, err = w.tail().Seek(0, os.SEEK_CUR)
off, err = w.tail().Seek(0, io.SeekCurrent)
if err != nil {
return err
}
@ -418,7 +418,7 @@ func (w *WAL) cut() error {
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
return err
}
if _, err = newTail.Seek(off, os.SEEK_SET); err != nil {
if _, err = newTail.Seek(off, io.SeekStart); err != nil {
return err
}
@ -464,7 +464,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
found := false
for i, l := range w.locks {
_, lockIndex, err := parseWalName(path.Base(l.Name()))
_, lockIndex, err := parseWalName(filepath.Base(l.Name()))
if err != nil {
return err
}
@ -552,7 +552,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
return nil
}
mustSync := mustSync(st, w.state, len(ents))
mustSync := raft.MustSync(st, w.state, len(ents))
// TODO(xiangli): no more reference operator
for i := range ents {
@ -564,7 +564,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
return err
}
curOff, err := w.tail().Seek(0, os.SEEK_CUR)
curOff, err := w.tail().Seek(0, io.SeekCurrent)
if err != nil {
return err
}
@ -611,22 +611,13 @@ func (w *WAL) seq() uint64 {
if t == nil {
return 0
}
seq, _, err := parseWalName(path.Base(t.Name()))
seq, _, err := parseWalName(filepath.Base(t.Name()))
if err != nil {
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
}
return seq
}
func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
// Persistent state on all servers:
// (Updated on stable storage before responding to RPCs)
// currentTerm
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}
func closeAll(rcs ...io.ReadCloser) error {
for _, f := range rcs {
if err := f.Close(); err != nil {

View file

@ -506,7 +506,7 @@ func init() { proto.RegisterFile("record.proto", fileDescriptorRecord) }
var fileDescriptorRecord = []byte{
// 186 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x4a, 0x4d, 0xce,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x4a, 0x4d, 0xce,
0x2f, 0x4a, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2d, 0x4f, 0xcc, 0x29, 0x48, 0x92,
0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x8b, 0xe8, 0x83, 0x58, 0x10, 0x49, 0x25, 0x3f, 0x2e, 0xb6,
0x20, 0xb0, 0x62, 0x21, 0x09, 0x2e, 0x96, 0x92, 0xca, 0x82, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d,