diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-11-12 14:47:59 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-11-12 15:01:12 +0900 |
commit | 95e7cc73083dfbc161b4ac6b0271c006d822d38d (patch) | |
tree | f76aeb30f19ee66aca066f366f105c0d0a5bf20c | |
parent | 180d68f88f457096ec92bcb6eabd915a64bb8858 (diff) |
mrt: support reset/rotate commands
$ gobgp mrt update reset
create a new file or reopen the existing file and continue to write
update messages to it.
$ gobgp mrt update <filename>
rename the current logging file to <filename> and truncate the logging
file and continue to write update messages to it.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | api/gobgp.pb.go | 37 | ||||
-rw-r--r-- | api/gobgp.proto | 7 | ||||
-rw-r--r-- | gobgp/cmd/common.go | 2 | ||||
-rw-r--r-- | gobgp/cmd/mrt.go | 35 | ||||
-rw-r--r-- | server/grpc_server.go | 5 | ||||
-rw-r--r-- | server/server.go | 42 | ||||
-rw-r--r-- | server/watcher.go | 57 |
7 files changed, 180 insertions, 5 deletions
diff --git a/api/gobgp.pb.go b/api/gobgp.pb.go index 27e0c5ed..04f6f9b0 100644 --- a/api/gobgp.pb.go +++ b/api/gobgp.pb.go @@ -14,6 +14,7 @@ It has these top-level messages: ModPathArguments ModNeighborArguments MrtArguments + ModMrtArguments ModVrfArguments ModDefinedSetArguments ModStatementArguments @@ -392,6 +393,15 @@ func (m *MrtArguments) Reset() { *m = MrtArguments{} } func (m *MrtArguments) String() string { return proto.CompactTextString(m) } func (*MrtArguments) ProtoMessage() {} +type ModMrtArguments struct { + Operation Operation `protobuf:"varint,1,opt,name=operation,enum=gobgpapi.Operation" json:"operation,omitempty"` + Filename string `protobuf:"bytes,2,opt,name=filename" json:"filename,omitempty"` +} + +func (m *ModMrtArguments) Reset() { *m = ModMrtArguments{} } +func (m *ModMrtArguments) String() string { return proto.CompactTextString(m) } +func (*ModMrtArguments) ProtoMessage() {} + type ModVrfArguments struct { Operation Operation `protobuf:"varint,1,opt,name=operation,enum=gobgpapi.Operation" json:"operation,omitempty"` Vrf *Vrf `protobuf:"bytes,2,opt,name=vrf" json:"vrf,omitempty"` @@ -1518,6 +1528,7 @@ type GobgpApiClient interface { MonitorBestChanged(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (GobgpApi_MonitorBestChangedClient, error) MonitorPeerState(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (GobgpApi_MonitorPeerStateClient, error) GetMrt(ctx context.Context, in *MrtArguments, opts ...grpc.CallOption) (GobgpApi_GetMrtClient, error) + ModMrt(ctx context.Context, in *ModMrtArguments, opts ...grpc.CallOption) (*Error, error) GetRPKI(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (GobgpApi_GetRPKIClient, error) GetROA(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (GobgpApi_GetROAClient, error) GetVrfs(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (GobgpApi_GetVrfsClient, error) @@ -1836,6 +1847,15 @@ func (x *gobgpApiGetMrtClient) Recv() (*MrtMessage, error) { return m, nil } +func (c *gobgpApiClient) ModMrt(ctx context.Context, in *ModMrtArguments, opts ...grpc.CallOption) (*Error, error) { + out := new(Error) + err := grpc.Invoke(ctx, "/gobgpapi.GobgpApi/ModMrt", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *gobgpApiClient) GetRPKI(ctx context.Context, in *Arguments, opts ...grpc.CallOption) (GobgpApi_GetRPKIClient, error) { stream, err := grpc.NewClientStream(ctx, &_GobgpApi_serviceDesc.Streams[6], c.cc, "/gobgpapi.GobgpApi/GetRPKI", opts...) if err != nil { @@ -2129,6 +2149,7 @@ type GobgpApiServer interface { MonitorBestChanged(*Arguments, GobgpApi_MonitorBestChangedServer) error MonitorPeerState(*Arguments, GobgpApi_MonitorPeerStateServer) error GetMrt(*MrtArguments, GobgpApi_GetMrtServer) error + ModMrt(context.Context, *ModMrtArguments) (*Error, error) GetRPKI(*Arguments, GobgpApi_GetRPKIServer) error GetROA(*Arguments, GobgpApi_GetROAServer) error GetVrfs(*Arguments, GobgpApi_GetVrfsServer) error @@ -2413,6 +2434,18 @@ func (x *gobgpApiGetMrtServer) Send(m *MrtMessage) error { return x.ServerStream.SendMsg(m) } +func _GobgpApi_ModMrt_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(ModMrtArguments) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(GobgpApiServer).ModMrt(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + func _GobgpApi_GetRPKI_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(Arguments) if err := stream.RecvMsg(m); err != nil { @@ -2696,6 +2729,10 @@ var _GobgpApi_serviceDesc = grpc.ServiceDesc{ Handler: _GobgpApi_Disable_Handler, }, { + MethodName: "ModMrt", + Handler: _GobgpApi_ModMrt_Handler, + }, + { MethodName: "ModVrf", Handler: _GobgpApi_ModVrf_Handler, }, diff --git a/api/gobgp.proto b/api/gobgp.proto index 076a018b..fc6d8f69 100644 --- a/api/gobgp.proto +++ b/api/gobgp.proto @@ -37,6 +37,7 @@ service GobgpApi { rpc MonitorBestChanged(Arguments) returns (stream Destination) {} rpc MonitorPeerState(Arguments) returns (stream Peer) {} rpc GetMrt(MrtArguments) returns (stream MrtMessage) {} + rpc ModMrt(ModMrtArguments) returns (Error) {} rpc GetRPKI(Arguments) returns (stream RPKI) {} rpc GetROA(Arguments) returns (stream ROA) {} rpc GetVrfs(Arguments) returns (stream Vrf) {} @@ -79,6 +80,7 @@ message ModNeighborArguments { Operation operation = 1; Peer peer = 2; } + message MrtArguments { Resource resource = 1; uint32 rf = 2; @@ -86,6 +88,11 @@ message MrtArguments { string neighbor_address = 4; } +message ModMrtArguments { + Operation operation = 1; + string filename = 2; +} + message ModVrfArguments { Operation operation = 1; Vrf vrf = 2; diff --git a/gobgp/cmd/common.go b/gobgp/cmd/common.go index 164d441d..fa91e326 100644 --- a/gobgp/cmd/common.go +++ b/gobgp/cmd/common.go @@ -66,6 +66,8 @@ const ( CMD_STATEMENT = "statement" CMD_CONDITION = "condition" CMD_ACTION = "action" + CMD_UPDATE = "update" + CMD_ROTATE = "rotate" ) var subOpts struct { diff --git a/gobgp/cmd/mrt.go b/gobgp/cmd/mrt.go index 55ccf11e..219a5616 100644 --- a/gobgp/cmd/mrt.go +++ b/gobgp/cmd/mrt.go @@ -413,10 +413,43 @@ func NewMrtCmd() *cobra.Command { } injectCmd.AddCommand(globalInjectCmd) + rotateCmd := &cobra.Command{ + Use: CMD_ROTATE, + Run: func(cmd *cobra.Command, args []string) { + if len(args) != 1 { + fmt.Println("usage: gobgp mrt update rotate <filename>") + os.Exit(1) + } + arg := &api.ModMrtArguments{ + Operation: api.Operation_REPLACE, + Filename: args[0], + } + client.ModMrt(context.Background(), arg) + }, + } + restartCmd := &cobra.Command{ + Use: CMD_RESET, + Run: func(cmd *cobra.Command, args []string) { + if len(args) > 0 { + fmt.Println("usage: gobgp mrt update reset") + os.Exit(1) + } + arg := &api.ModMrtArguments{ + Operation: api.Operation_REPLACE, + } + client.ModMrt(context.Background(), arg) + }, + } + + updateCmd := &cobra.Command{ + Use: CMD_UPDATE, + } + updateCmd.AddCommand(restartCmd, rotateCmd) + mrtCmd := &cobra.Command{ Use: CMD_MRT, } - mrtCmd.AddCommand(dumpCmd, injectCmd) + mrtCmd.AddCommand(dumpCmd, injectCmd, updateCmd) return mrtCmd } diff --git a/server/grpc_server.go b/server/grpc_server.go index c475f232..5ec5947b 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -48,6 +48,7 @@ const ( REQ_MONITOR_NEIGHBOR_PEER_STATE REQ_MRT_GLOBAL_RIB REQ_MRT_LOCAL_RIB + REQ_MOD_MRT REQ_RPKI REQ_ROA REQ_VRF @@ -259,6 +260,10 @@ func (s *Server) GetMrt(arg *api.MrtArguments, stream api.GobgpApi_GetMrtServer) }) } +func (s *Server) ModMrt(ctx context.Context, arg *api.ModMrtArguments) (*api.Error, error) { + return s.mod(REQ_MOD_MRT, arg) +} + func (s *Server) GetRPKI(arg *api.Arguments, stream api.GobgpApi_GetRPKIServer) error { req := NewGrpcRequest(REQ_RPKI, "", bgp.RouteFamily(arg.Rf), nil) s.bgpServerCh <- req diff --git a/server/server.go b/server/server.go index 8a5cabdf..46b9b44d 100644 --- a/server/server.go +++ b/server/server.go @@ -1691,6 +1691,8 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { server.broadcastReqs = append(server.broadcastReqs, grpcReq) case REQ_MRT_GLOBAL_RIB, REQ_MRT_LOCAL_RIB: server.handleMrt(grpcReq) + case REQ_MOD_MRT: + server.handleModMrt(grpcReq) case REQ_ROA, REQ_RPKI: server.roaClient.handleGRPC(grpcReq) case REQ_VRF, REQ_VRFS, REQ_VRF_MOD: @@ -2197,6 +2199,46 @@ func (server *BgpServer) handleGrpcModPolicyAssignment(grpcReq *GrpcRequest) err return err } +func (server *BgpServer) handleModMrt(grpcReq *GrpcRequest) { + done := func(e error) { + result := &GrpcResponse{ + ResponseErr: e, + } + grpcReq.ResponseCh <- result + close(grpcReq.ResponseCh) + } + arg := grpcReq.Data.(*api.ModMrtArguments) + w, y := server.watchers[WATCHER_MRT] + if arg.Operation == api.Operation_ADD { + if y { + done(fmt.Errorf("already enabled")) + return + } + } else { + if !y { + done(fmt.Errorf("not enabled yet")) + return + } + } + switch arg.Operation { + case api.Operation_ADD: + w, err := newMrtWatcher(arg.Filename) + if err == nil { + server.watchers[WATCHER_MRT] = w + } + done(err) + case api.Operation_DEL: + delete(server.watchers, WATCHER_MRT) + w.stop() + done(nil) + case api.Operation_REPLACE: + go func() { + err := w.restart(arg.Filename) + done(err) + }() + } +} + func (server *BgpServer) handleMrt(grpcReq *GrpcRequest) { now := uint32(time.Now().Unix()) view := "" diff --git a/server/watcher.go b/server/watcher.go index a4c3a192..bed70538 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -16,6 +16,7 @@ package server import ( + "fmt" log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/packet" "gopkg.in/tomb.v2" @@ -66,14 +67,21 @@ type watcherEventUpdateMsg struct { type watcher interface { notify(watcherEventType) chan watcherEvent + restart(string) error stop() } +type mrtWatcherOp struct { + filename string //used for rotate + result chan error +} + type mrtWatcher struct { t tomb.Tomb filename string file *os.File ch chan watcherEvent + opCh chan *mrtWatcherOp } func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent { @@ -87,7 +95,21 @@ func (w *mrtWatcher) stop() { w.t.Kill(nil) } +func (w *mrtWatcher) restart(filename string) error { + adminOp := &mrtWatcherOp{ + filename: filename, + result: make(chan error), + } + select { + case w.opCh <- adminOp: + default: + return fmt.Errorf("already an admin operaiton in progress") + } + return <-adminOp.result +} + func (w *mrtWatcher) loop() error { + defer w.file.Close() for { write := func(ev watcherEvent) { m := ev.(*watcherEventUpdateMsg) @@ -117,28 +139,55 @@ func (w *mrtWatcher) loop() error { } } - select { - case <-w.t.Dying(): + drain := func() { for len(w.ch) > 0 { m := <-w.ch write(m) } + } + + select { + case <-w.t.Dying(): + drain() return nil case m := <-w.ch: write(m) + case adminOp := <-w.opCh: + var err error + if adminOp.filename != "" { + err = os.Rename(w.file.Name(), adminOp.filename) + } + if err == nil { + var file *os.File + file, err = mrtFileOpen(w.file.Name()) + if err == nil { + w.file.Close() + w.file = file + } + } + adminOp.result <- err } } } -func newMrtWatcher(filename string) (*mrtWatcher, error) { +func mrtFileOpen(filename string) (*os.File, error) { file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) if err != nil { - log.Fatal(err) + log.Warn(err) + } + return file, err +} + +func newMrtWatcher(filename string) (*mrtWatcher, error) { + file, err := mrtFileOpen(filename) + if err != nil { + return nil, err } w := mrtWatcher{ filename: filename, file: file, ch: make(chan watcherEvent), + opCh: make(chan *mrtWatcherOp, 1), } w.t.Go(w.loop) return &w, nil |