diff --git a/README.md b/README.md index fe68c944..77321125 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,9 @@ To start a single-node Kosu development network as a validator, and initialize a ./kosud --init --web3 wss://ethnet.zaidan.io/ws/ropsten ``` +To start the `JSON-RPC` bridge and interact with kosud use the `kosud rpc` sub-command. +By default the HTTP and WS endpoints are binded to ports `14341` and `14342` repectively. + The command-line interface will also be built (see `kosu-cli help` for all commands). ``` diff --git a/packages/go-kosu/abci/config.go b/packages/go-kosu/abci/config.go index 87fe8f7a..9f3d92da 100644 --- a/packages/go-kosu/abci/config.go +++ b/packages/go-kosu/abci/config.go @@ -3,6 +3,7 @@ package abci import ( "encoding/base64" "encoding/json" + "errors" "fmt" "io/ioutil" "os" @@ -21,6 +22,8 @@ const ( KOSUHOME = ".kosu" ) +var errHomeDirNotFound = errors.New("homedir does not exists! Did you run the init command?") + // DefaultHomeDir is the default full path used to store config and data var DefaultHomeDir = os.ExpandEnv(fmt.Sprintf("$HOME/%s", KOSUHOME)) @@ -31,7 +34,7 @@ func LoadConfig(homedir string) (*config.Config, error) { } if !common.FileExists(filepath.Join(homedir, "config", "config.toml")) { - return nil, fmt.Errorf("missing homedir! Did you run the init command?") + return nil, errHomeDirNotFound } // Have a config file, load it @@ -43,7 +46,7 @@ func LoadConfig(homedir string) (*config.Config, error) { // I don't think this ever returns an err. It seems to create a default config if missing err := viper.ReadInConfig() if err != nil { - return nil, fmt.Errorf("missing homedir/config file. Did you run 'kosud --init'?") + return nil, errHomeDirNotFound } cfg := config.DefaultConfig() diff --git a/packages/go-kosu/cmd/kosud/main.go b/packages/go-kosu/cmd/kosud/main.go index 6d447c3e..fa929f9d 100644 --- a/packages/go-kosu/cmd/kosud/main.go +++ b/packages/go-kosu/cmd/kosud/main.go @@ -12,6 +12,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "go-kosu/abci" + "go-kosu/rpc" "go-kosu/witness" ) @@ -96,8 +97,8 @@ func main() { } }, } - rootCmd.Flags().StringVar(&cfg.Home, "home", "~/.kosu", "directory for config and data") - rootCmd.Flags().BoolVar(&cfg.Debug, "debug", false, "enable debuging") + rootCmd.PersistentFlags().StringVar(&cfg.Home, "home", "~/.kosu", "directory for config and data") + rootCmd.PersistentFlags().BoolVar(&cfg.Debug, "debug", false, "enable debuging") rootCmd.Flags().StringVar(&cfg.Web3, "web3", "wss://ethnet.zaidan.io/ws/kosu", "web3 provider URL") rootCmd.Flags().BoolVar(&cfg.Init, "init", false, "initializes directory like 'tendermint init' does") @@ -116,6 +117,8 @@ func main() { } }) + rootCmd.AddCommand(rpc.NewCommand()) + if err := rootCmd.Execute(); err != nil { stdlog.Fatal(err) } diff --git a/packages/go-kosu/rpc/client.go b/packages/go-kosu/rpc/client.go new file mode 100644 index 00000000..773f04ae --- /dev/null +++ b/packages/go-kosu/rpc/client.go @@ -0,0 +1,55 @@ +package rpc + +import ( + "context" + + "github.com/ethereum/go-ethereum/rpc" +) + +type Client struct { + rpc *rpc.Client +} + +func DialInProc(srv *rpc.Server) *Client { + return &Client{ + rpc: rpc.DialInProc(srv), + } +} + +func (c *Client) Subscribe(ctx context.Context, fn func(interface{}), query string) error { + ch := make(chan interface{}) + args := []interface{}{"subscribe", query} + sub, err := c.rpc.Subscribe(ctx, "kosu", ch, args...) + if err != nil { + return err + } + + go func() { + defer close(ch) + defer sub.Unsubscribe() + + for { + select { + case <-ctx.Done(): + return + case <-sub.Err(): + return + case i := <-ch: + fn(i) + } + } + }() + return nil +} + +func (c *Client) Call(result interface{}, ns, method string, args ...interface{}) error { + return c.rpc.Call(result, ns+"_"+method, args...) +} + +func (c *Client) LatestHeight() (int64, error) { + var latestHeight int64 + if err := c.Call(&latestHeight, "kosu", "latestHeight"); err != nil { + return 0, err + } + return latestHeight, nil +} diff --git a/packages/go-kosu/rpc/cmd.go b/packages/go-kosu/rpc/cmd.go new file mode 100644 index 00000000..f7b79cc2 --- /dev/null +++ b/packages/go-kosu/rpc/cmd.go @@ -0,0 +1,104 @@ +package rpc + +import ( + "errors" + "fmt" + "go-kosu/abci" + "log" + "net/http" + "sync" + + "github.com/ethereum/go-ethereum/rpc" + "github.com/spf13/cobra" +) + +// NewCommand returns a new cobra.Command to be attached as a sub-command +func NewCommand() *cobra.Command { + var ( + url string + + http bool + httpPort int + + ws bool + wsPort int + + key []byte + ) + cmd := &cobra.Command{ + Use: "rpc", + Short: "starts the rpc bridge", + Long: "The RPC bridge exposes a set of kosud functionalities over JSON-RPC 2.0", + PreRunE: func(cmd *cobra.Command, args []string) error { + if http == false && ws == false { + return errors.New("both `--ws` and `--http` where false, you need to enable at least one") + } + + var homeDir string + if home := cmd.Flag("home"); home == nil { + homeDir = "~/kosu" + } else { + homeDir = home.Value.String() + } + + var err error + key, err = abci.LoadPrivateKey(homeDir) + if err != nil { + return err + } + + return nil + }, + Run: func(cmd *cobra.Command, args []string) { + client := abci.NewHTTPClient(url, key) + srv := NewServer(client) + + wg := sync.WaitGroup{} + if http == true { + wg.Add(1) + go func() { + defer wg.Done() + if err := startHTTP(srv, httpPort); err != nil { + log.Printf("http: %s", err) + } + }() + } + + if ws == true { + wg.Add(1) + go func() { + defer wg.Done() + if err := startWS(srv, wsPort); err != nil { + log.Printf("ws: %s", err) + } + }() + } + wg.Wait() + }, + } + + cmd.Flags().StringVar(&url, "url", "http://localhost:26657", "URL exposed by kosud") + cmd.Flags().BoolVar(&http, "http", true, "Starts the HTTP server") + cmd.Flags().IntVar(&httpPort, "http-port", 14341, "HTTP server listening port") + + cmd.Flags().BoolVar(&ws, "ws", true, "Starts the WebSocket server") + cmd.Flags().IntVar(&wsPort, "ws-port", 14342, "WebSocket server listening port") + + return cmd +} + +func startHTTP(srv *rpc.Server, port int) error { + bind := fmt.Sprintf(":%d", port) + log.Printf("Starting HTTP server on %s", bind) + return http.ListenAndServe(bind, srv) +} + +func startWS(srv *rpc.Server, port int) error { + bind := fmt.Sprintf(":%d", port) + log.Printf("Starting WS server on %s", bind) + + return http.ListenAndServe( + bind, + srv.WebsocketHandler([]string{"*"}), + ) +} diff --git a/packages/go-kosu/rpc/doctool/main.go b/packages/go-kosu/rpc/doctool/main.go new file mode 100644 index 00000000..d579d266 --- /dev/null +++ b/packages/go-kosu/rpc/doctool/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "encoding/json" + "fmt" + "go/doc" + "go/parser" + "go/token" + "log" + "os" +) + +type DocEntry struct { + Method string `json:"method"` + Text string `json:"text"` +} + +func main() { + if len(os.Args) < 2 { + fmt.Println("Usage: doctool ") + return + } + + fset := token.NewFileSet() + pkgs, err := parser.ParseDir(fset, os.Args[1], nil, parser.ParseComments) + if err != nil { + log.Fatal(err) + } + + docs := []DocEntry{} + pkg := doc.New(pkgs["rpc"], os.Args[1], doc.AllDecls) + for _, t := range pkg.Types { + for _, m := range t.Methods { + docs = append(docs, DocEntry{Method: m.Name, Text: m.Doc}) + } + } + + text, err := json.MarshalIndent(docs, "", " ") + if err != nil { + log.Fatal(err) + } + + fmt.Printf("%s\n", text) +} diff --git a/packages/go-kosu/rpc/rpc.go b/packages/go-kosu/rpc/rpc.go new file mode 100644 index 00000000..c990a389 --- /dev/null +++ b/packages/go-kosu/rpc/rpc.go @@ -0,0 +1,13 @@ +package rpc + +import ( + "go-kosu/abci" + + "github.com/ethereum/go-ethereum/rpc" +) + +func NewServer(abci *abci.Client) *rpc.Server { + srv := rpc.NewServer() + srv.RegisterName("kosu", &Service{abci: abci}) + return srv +} diff --git a/packages/go-kosu/rpc/rpc_test.go b/packages/go-kosu/rpc/rpc_test.go new file mode 100644 index 00000000..e1b34cf3 --- /dev/null +++ b/packages/go-kosu/rpc/rpc_test.go @@ -0,0 +1,45 @@ +package rpc + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go-kosu/abci" + "go-kosu/tests" + + "github.com/tendermint/tendermint/libs/db" +) + +func TestRPCLatestHeight(t *testing.T) { + _, closer := tests.StartServer(t, db.NewMemDB()) + defer closer() + client := DialInProc( + NewServer( + abci.NewHTTPClient("http://localhost:26657", nil), + ), + ) + + // Get the initial (prior the first block is mined) + latest, err := client.LatestHeight() + require.NoError(t, err) + assert.EqualValues(t, 0, latest) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + fn := func(i interface{}) { + // this is invoked when a block is mined + latest, err := client.LatestHeight() + require.NoError(t, err) + assert.EqualValues(t, 1, latest) + + cancel() + } + + err = client.Subscribe(ctx, fn, "tm.event = 'NewBlock'") + require.NoError(t, err) + + <-ctx.Done() +} diff --git a/packages/go-kosu/rpc/service.go b/packages/go-kosu/rpc/service.go new file mode 100644 index 00000000..96398841 --- /dev/null +++ b/packages/go-kosu/rpc/service.go @@ -0,0 +1,67 @@ +package rpc + +import ( + "context" + "go-kosu/abci" + + "github.com/ethereum/go-ethereum/rpc" +) + +// Service is a RPC service +type Service struct { + abci *abci.Client +} + +// NewService returns a new service given a abci client +func NewService(abci *abci.Client) *Service { + return &Service{ + abci: abci, + } +} + +// Subscribe subscribes to the ABCI events +// To tell which events you want, you need to provide a query. +// More information about query can be found here: https://tendermint.com/rpc/#subscribe +func (s *Service) Subscribe(ctx context.Context, query string) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + events, closer, err := s.abci.Subscribe(ctx, query) + if err != nil { + return nil, err + } + + rpcSub := notifier.CreateSubscription() + go func() { + defer closer() + + for { + select { + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + case e := <-events: + notifier.Notify(rpcSub.ID, e) + } + } + }() + + return rpcSub, nil +} + +// LatestHeight returns the height of the best known block +// The `latestHeight` method will return the integer height of the latest block committed to the blockchain.", +func (s *Service) LatestHeight() (int64, error) { + res, err := s.abci.Block(nil) + if err != nil { + return 0, err + } + if res.Block == nil { + return 0, nil + } + + return res.Block.Height, nil +} diff --git a/packages/go-kosu/tests/suite_test.go b/packages/go-kosu/tests/suite_test.go index 04bdd7d0..cf15fbbb 100644 --- a/packages/go-kosu/tests/suite_test.go +++ b/packages/go-kosu/tests/suite_test.go @@ -2,8 +2,13 @@ package tests import ( "go-kosu/abci" + "testing" + . "github.com/smartystreets/goconvey/convey" //nolint + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + "github.com/tendermint/tendermint/libs/db" ) type Suite struct { @@ -12,3 +17,35 @@ type Suite struct { client *abci.Client app *abci.App } + +// GivenABCIServer a ABCI Server inside a Convey block +func GivenABCIServer(t *testing.T, suite *Suite, fn func(*testing.T)) { + Convey("Given an ABCI Server", t, func() { + app, closer := StartServer(t, db.NewMemDB()) + defer closer() + suite.app = app + + key, err := abci.LoadPrivateKey(app.Config.RootDir) + require.NoError(t, err) + + suite.client = abci.NewHTTPClient("http://localhost:26657", key) + fn(t) + }) +} + +// BroadcastTxSync is a helper function to Broadcast a Tx under test +func BroadcastTxSync(t *testing.T, c *abci.Client, tx interface{}) *rpctypes.ResultBroadcastTx { + res, err := c.BroadcastTxSync(tx) + So(err, ShouldBeNil) + require.Zero(t, res.Code, res.Log) + return res +} + +// BroadcastTxCommit is a helper function to Broadcast a Tx under test +func BroadcastTxCommit(t *testing.T, c *abci.Client, tx interface{}) *rpctypes.ResultBroadcastTxCommit { + res, err := c.BroadcastTxCommit(tx) + So(err, ShouldBeNil) + require.True(t, res.CheckTx.IsOK(), res.CheckTx.Log) + require.True(t, res.DeliverTx.IsOK(), res.DeliverTx.Log) + return res +} diff --git a/packages/go-kosu/tests/support.go b/packages/go-kosu/tests/support.go index 1cf19ec4..eaf2b62c 100644 --- a/packages/go-kosu/tests/support.go +++ b/packages/go-kosu/tests/support.go @@ -6,31 +6,14 @@ import ( "testing" "time" - . "github.com/smartystreets/goconvey/convey" //nolint "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" - rpctypes "github.com/tendermint/tendermint/rpc/core/types" "go-kosu/abci" ) -// GivenABCIServer a ABCI Server inside a Convey block -func GivenABCIServer(t *testing.T, suite *Suite, fn func(*testing.T)) { - Convey("Given an ABCI Server", t, func() { - app, closer := StartServer(t, db.NewMemDB()) - defer closer() - suite.app = app - - key, err := abci.LoadPrivateKey(app.Config.RootDir) - require.NoError(t, err) - - suite.client = abci.NewHTTPClient("http://localhost:26657", key) - fn(t) - }) -} - // StartServer starts a kosud test server func StartServer(t *testing.T, db db.DB) (*abci.App, func()) { // Create a temp dir and initialize tendermint there @@ -55,20 +38,3 @@ func StartServer(t *testing.T, db db.DB) (*abci.App, func()) { os.RemoveAll(dir) } } - -// BroadcastTxSync is a helper function to Broadcast a Tx under test -func BroadcastTxSync(t *testing.T, c *abci.Client, tx interface{}) *rpctypes.ResultBroadcastTx { - res, err := c.BroadcastTxSync(tx) - So(err, ShouldBeNil) - require.Zero(t, res.Code, res.Log) - return res -} - -// BroadcastTxCommit is a helper function to Broadcast a Tx under test -func BroadcastTxCommit(t *testing.T, c *abci.Client, tx interface{}) *rpctypes.ResultBroadcastTxCommit { - res, err := c.BroadcastTxCommit(tx) - So(err, ShouldBeNil) - require.True(t, res.CheckTx.IsOK(), res.CheckTx.Log) - require.True(t, res.DeliverTx.IsOK(), res.DeliverTx.Log) - return res -}