Skip to content

Commit a9279cd

Browse files
authored
Add tests for truncated and symlinked files in filestream input (elastic#24425)
1 parent 1d858bd commit a9279cd

1 file changed

Lines changed: 167 additions & 0 deletions

File tree

filebeat/input/filestream/input_integration_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"context"
2525
"os"
2626
"runtime"
27+
"strconv"
2728
"testing"
2829
"time"
2930

@@ -736,3 +737,169 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
736737
cancelInput()
737738
env.waitUntilInputStops()
738739
}
740+
741+
// test_symlinks_enabled from test_harvester.py
742+
func TestFilestreamSymlinksEnabled(t *testing.T) {
743+
env := newInputTestingEnvironment(t)
744+
745+
testlogName := "test.log"
746+
symlinkName := "test.log.symlink"
747+
inp := env.mustCreateInput(map[string]interface{}{
748+
"paths": []string{
749+
env.abspath(symlinkName),
750+
},
751+
"prospector.scanner.symlinks": "true",
752+
})
753+
754+
testlines := []byte("first line\n")
755+
env.mustWriteLinesToFile(testlogName, testlines)
756+
757+
env.mustSymlink(testlogName, symlinkName)
758+
759+
ctx, cancelInput := context.WithCancel(context.Background())
760+
env.startInput(ctx, inp)
761+
762+
env.waitUntilEventCount(1)
763+
764+
cancelInput()
765+
env.waitUntilInputStops()
766+
767+
env.requireOffsetInRegistry(testlogName, len(testlines))
768+
}
769+
770+
// test_symlink_rotated from test_harvester.py
771+
func TestFilestreamSymlinkRotated(t *testing.T) {
772+
env := newInputTestingEnvironment(t)
773+
774+
firstTestlogName := "test1.log"
775+
secondTestlogName := "test2.log"
776+
symlinkName := "test.log"
777+
inp := env.mustCreateInput(map[string]interface{}{
778+
"paths": []string{
779+
env.abspath(symlinkName),
780+
},
781+
"prospector.scanner.check_interval": "1ms",
782+
"prospector.scanner.symlinks": "true",
783+
"close.on_state_change.removed": "false",
784+
"clean_removed": "false",
785+
})
786+
787+
commonLine := "first line in file "
788+
for i, path := range []string{firstTestlogName, secondTestlogName} {
789+
env.mustWriteLinesToFile(path, []byte(commonLine+strconv.Itoa(i)+"\n"))
790+
}
791+
792+
env.mustSymlink(firstTestlogName, symlinkName)
793+
794+
ctx, cancelInput := context.WithCancel(context.Background())
795+
env.startInput(ctx, inp)
796+
797+
env.waitUntilEventCount(1)
798+
799+
expectedOffset := len(commonLine) + 2
800+
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)
801+
802+
// rotate symlink
803+
env.mustRemoveFile(symlinkName)
804+
env.mustSymlink(secondTestlogName, symlinkName)
805+
806+
moreLines := "second line in file 2\nthird line in file 2\n"
807+
env.mustAppendLinesToFile(secondTestlogName, []byte(moreLines))
808+
809+
env.waitUntilEventCount(4)
810+
env.requireOffsetInRegistry(firstTestlogName, expectedOffset)
811+
env.requireOffsetInRegistry(secondTestlogName, expectedOffset+len(moreLines))
812+
813+
cancelInput()
814+
env.waitUntilInputStops()
815+
816+
env.requireRegistryEntryCount(2)
817+
}
818+
819+
// test_symlink_removed from test_harvester.py
820+
func TestFilestreamSymlinkRemoved(t *testing.T) {
821+
env := newInputTestingEnvironment(t)
822+
823+
testlogName := "test.log"
824+
symlinkName := "test.log.symlink"
825+
inp := env.mustCreateInput(map[string]interface{}{
826+
"paths": []string{
827+
env.abspath(symlinkName),
828+
},
829+
"prospector.scanner.check_interval": "1ms",
830+
"prospector.scanner.symlinks": "true",
831+
"close.on_state_change.removed": "false",
832+
"clean_removed": "false",
833+
})
834+
835+
line := []byte("first line\n")
836+
env.mustWriteLinesToFile(testlogName, line)
837+
838+
env.mustSymlink(testlogName, symlinkName)
839+
840+
ctx, cancelInput := context.WithCancel(context.Background())
841+
env.startInput(ctx, inp)
842+
843+
env.waitUntilEventCount(1)
844+
845+
env.requireOffsetInRegistry(testlogName, len(line))
846+
847+
// remove symlink
848+
env.mustRemoveFile(symlinkName)
849+
850+
env.mustAppendLinesToFile(testlogName, line)
851+
852+
env.waitUntilEventCount(2)
853+
env.requireOffsetInRegistry(testlogName, 2*len(line))
854+
855+
cancelInput()
856+
env.waitUntilInputStops()
857+
858+
env.requireRegistryEntryCount(1)
859+
}
860+
861+
// test_truncate from test_harvester.py
862+
func TestFilestreamTruncate(t *testing.T) {
863+
env := newInputTestingEnvironment(t)
864+
865+
testlogName := "test.log"
866+
symlinkName := "test.log.symlink"
867+
inp := env.mustCreateInput(map[string]interface{}{
868+
"paths": []string{
869+
env.abspath("*"),
870+
},
871+
"prospector.scanner.check_interval": "1ms",
872+
"prospector.scanner.symlinks": "true",
873+
})
874+
875+
lines := []byte("first line\nsecond line\nthird line\n")
876+
env.mustWriteLinesToFile(testlogName, lines)
877+
878+
env.mustSymlink(testlogName, symlinkName)
879+
880+
ctx, cancelInput := context.WithCancel(context.Background())
881+
env.startInput(ctx, inp)
882+
883+
env.waitUntilEventCount(3)
884+
885+
env.requireOffsetInRegistry(testlogName, len(lines))
886+
887+
// remove symlink
888+
env.mustRemoveFile(symlinkName)
889+
env.mustTruncateFile(testlogName, 0)
890+
env.waitUntilOffsetInRegistry(testlogName, 0)
891+
892+
// recreate symlink
893+
env.mustSymlink(testlogName, symlinkName)
894+
895+
moreLines := []byte("forth line\nfifth line\n")
896+
env.mustWriteLinesToFile(testlogName, moreLines)
897+
898+
env.waitUntilEventCount(5)
899+
env.requireOffsetInRegistry(testlogName, len(moreLines))
900+
901+
cancelInput()
902+
env.waitUntilInputStops()
903+
904+
env.requireRegistryEntryCount(1)
905+
}

0 commit comments

Comments
 (0)