@@ -24,6 +24,7 @@ import (
2424 "context"
2525 "os"
2626 "runtime"
27+ "strconv"
2728 "testing"
2829 "time"
2930
@@ -736,3 +737,169 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
736737 cancelInput ()
737738 env .waitUntilInputStops ()
738739}
740+
741+ // test_symlinks_enabled from test_harvester.py
742+ func TestFilestreamSymlinksEnabled (t * testing.T ) {
743+ env := newInputTestingEnvironment (t )
744+
745+ testlogName := "test.log"
746+ symlinkName := "test.log.symlink"
747+ inp := env .mustCreateInput (map [string ]interface {}{
748+ "paths" : []string {
749+ env .abspath (symlinkName ),
750+ },
751+ "prospector.scanner.symlinks" : "true" ,
752+ })
753+
754+ testlines := []byte ("first line\n " )
755+ env .mustWriteLinesToFile (testlogName , testlines )
756+
757+ env .mustSymlink (testlogName , symlinkName )
758+
759+ ctx , cancelInput := context .WithCancel (context .Background ())
760+ env .startInput (ctx , inp )
761+
762+ env .waitUntilEventCount (1 )
763+
764+ cancelInput ()
765+ env .waitUntilInputStops ()
766+
767+ env .requireOffsetInRegistry (testlogName , len (testlines ))
768+ }
769+
770+ // test_symlink_rotated from test_harvester.py
771+ func TestFilestreamSymlinkRotated (t * testing.T ) {
772+ env := newInputTestingEnvironment (t )
773+
774+ firstTestlogName := "test1.log"
775+ secondTestlogName := "test2.log"
776+ symlinkName := "test.log"
777+ inp := env .mustCreateInput (map [string ]interface {}{
778+ "paths" : []string {
779+ env .abspath (symlinkName ),
780+ },
781+ "prospector.scanner.check_interval" : "1ms" ,
782+ "prospector.scanner.symlinks" : "true" ,
783+ "close.on_state_change.removed" : "false" ,
784+ "clean_removed" : "false" ,
785+ })
786+
787+ commonLine := "first line in file "
788+ for i , path := range []string {firstTestlogName , secondTestlogName } {
789+ env .mustWriteLinesToFile (path , []byte (commonLine + strconv .Itoa (i )+ "\n " ))
790+ }
791+
792+ env .mustSymlink (firstTestlogName , symlinkName )
793+
794+ ctx , cancelInput := context .WithCancel (context .Background ())
795+ env .startInput (ctx , inp )
796+
797+ env .waitUntilEventCount (1 )
798+
799+ expectedOffset := len (commonLine ) + 2
800+ env .requireOffsetInRegistry (firstTestlogName , expectedOffset )
801+
802+ // rotate symlink
803+ env .mustRemoveFile (symlinkName )
804+ env .mustSymlink (secondTestlogName , symlinkName )
805+
806+ moreLines := "second line in file 2\n third line in file 2\n "
807+ env .mustAppendLinesToFile (secondTestlogName , []byte (moreLines ))
808+
809+ env .waitUntilEventCount (4 )
810+ env .requireOffsetInRegistry (firstTestlogName , expectedOffset )
811+ env .requireOffsetInRegistry (secondTestlogName , expectedOffset + len (moreLines ))
812+
813+ cancelInput ()
814+ env .waitUntilInputStops ()
815+
816+ env .requireRegistryEntryCount (2 )
817+ }
818+
819+ // test_symlink_removed from test_harvester.py
820+ func TestFilestreamSymlinkRemoved (t * testing.T ) {
821+ env := newInputTestingEnvironment (t )
822+
823+ testlogName := "test.log"
824+ symlinkName := "test.log.symlink"
825+ inp := env .mustCreateInput (map [string ]interface {}{
826+ "paths" : []string {
827+ env .abspath (symlinkName ),
828+ },
829+ "prospector.scanner.check_interval" : "1ms" ,
830+ "prospector.scanner.symlinks" : "true" ,
831+ "close.on_state_change.removed" : "false" ,
832+ "clean_removed" : "false" ,
833+ })
834+
835+ line := []byte ("first line\n " )
836+ env .mustWriteLinesToFile (testlogName , line )
837+
838+ env .mustSymlink (testlogName , symlinkName )
839+
840+ ctx , cancelInput := context .WithCancel (context .Background ())
841+ env .startInput (ctx , inp )
842+
843+ env .waitUntilEventCount (1 )
844+
845+ env .requireOffsetInRegistry (testlogName , len (line ))
846+
847+ // remove symlink
848+ env .mustRemoveFile (symlinkName )
849+
850+ env .mustAppendLinesToFile (testlogName , line )
851+
852+ env .waitUntilEventCount (2 )
853+ env .requireOffsetInRegistry (testlogName , 2 * len (line ))
854+
855+ cancelInput ()
856+ env .waitUntilInputStops ()
857+
858+ env .requireRegistryEntryCount (1 )
859+ }
860+
861+ // test_truncate from test_harvester.py
862+ func TestFilestreamTruncate (t * testing.T ) {
863+ env := newInputTestingEnvironment (t )
864+
865+ testlogName := "test.log"
866+ symlinkName := "test.log.symlink"
867+ inp := env .mustCreateInput (map [string ]interface {}{
868+ "paths" : []string {
869+ env .abspath ("*" ),
870+ },
871+ "prospector.scanner.check_interval" : "1ms" ,
872+ "prospector.scanner.symlinks" : "true" ,
873+ })
874+
875+ lines := []byte ("first line\n second line\n third line\n " )
876+ env .mustWriteLinesToFile (testlogName , lines )
877+
878+ env .mustSymlink (testlogName , symlinkName )
879+
880+ ctx , cancelInput := context .WithCancel (context .Background ())
881+ env .startInput (ctx , inp )
882+
883+ env .waitUntilEventCount (3 )
884+
885+ env .requireOffsetInRegistry (testlogName , len (lines ))
886+
887+ // remove symlink
888+ env .mustRemoveFile (symlinkName )
889+ env .mustTruncateFile (testlogName , 0 )
890+ env .waitUntilOffsetInRegistry (testlogName , 0 )
891+
892+ // recreate symlink
893+ env .mustSymlink (testlogName , symlinkName )
894+
895+ moreLines := []byte ("forth line\n fifth line\n " )
896+ env .mustWriteLinesToFile (testlogName , moreLines )
897+
898+ env .waitUntilEventCount (5 )
899+ env .requireOffsetInRegistry (testlogName , len (moreLines ))
900+
901+ cancelInput ()
902+ env .waitUntilInputStops ()
903+
904+ env .requireRegistryEntryCount (1 )
905+ }
0 commit comments