Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/typescript/cp/cp-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,11 @@ const container = 'nginx';
const srcPath = './test.txt';
const targetPath = '/tmp';

// Simple copy without retries (default behavior)
await cp.cpFromPod(namespace, pod, container, srcPath, targetPath);

// For large files or unreliable connections, use retries:
// - maxTries > 0: Retry up to N times
// - maxTries < 0: Retry indefinitely
// await cp.cpFromPod(namespace, pod, container, srcPath, targetPath, undefined, 10);
// await cp.cpToPod(namespace, pod, container, srcPath, targetPath, 10);
284 changes: 246 additions & 38 deletions src/cp.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,127 @@
import { PassThrough, Readable } from 'stream';
import { WritableStreamBuffer } from 'stream-buffers';
import tar from 'tar-fs';

import { KubeConfig } from './config.js';
import { Exec } from './exec.js';

/**
* TarPipe wraps the tar stream from a pod with retry capabilities.
* When a read error occurs during transfer, it can resume from the last successful byte position.
*/
class TarPipe extends Readable {
private src: {
namespace: string;
podName: string;
containerName: string;
srcPath: string;
cwd?: string;
};
private execInstance: Exec;
private maxTries: number;
private bytesRead: number = 0;
private retries: number = 0;
private currentReader: Readable | null = null;
private errStream: WritableStreamBuffer | null = null;

constructor(
namespace: string,
podName: string,
containerName: string,
srcPath: string,
execInstance: Exec,
maxTries: number,
cwd?: string,
) {
super();
this.src = { namespace, podName, containerName, srcPath, cwd };
this.execInstance = execInstance;
this.maxTries = maxTries;
this.initReadFrom(0);
}

private initReadFrom(offset: number): void {
let command: string[];
if (this.maxTries !== 0 && offset > 0) {
// Use shell command with tail to resume from specific byte position
// tail -c+N is 1-indexed, so we add 1 to the 0-indexed offset
const tarCmd = this.src.cwd
? `tar cf - -C ${this.src.cwd} ${this.src.srcPath}`
: `tar cf - ${this.src.srcPath}`;
command = ['sh', '-c', `${tarCmd} | tail -c+${offset + 1}`];
} else {
command = ['tar', 'cf', '-'];
if (this.src.cwd) {
command.push('-C', this.src.cwd);
}
command.push(this.src.srcPath);
}

const writerStream = new PassThrough();

this.errStream = new WritableStreamBuffer();
this.currentReader = writerStream;

// Set up stream event handlers
writerStream.on('data', (chunk: Buffer) => {
this.bytesRead += chunk.length;
if (!this.push(chunk)) {
writerStream.pause();
}
});

writerStream.on('end', () => {
this.push(null); // Signal end of stream
});

writerStream.on('error', (err: Error) => {
this.handleError(err);
});

// Start exec
this.execInstance
.exec(
this.src.namespace,
this.src.podName,
this.src.containerName,
command,
writerStream,
this.errStream,
null,
false,
async () => {
if (this.errStream && this.errStream.size()) {
const errMsg = this.errStream.getContentsAsString();
this.handleError(new Error(`Error from pod - details: \n ${errMsg}`));
}
},
)
.catch((err: Error) => {
this.handleError(err);
});
}

private handleError(err: Error): void {
if (this.maxTries < 0 || this.retries < this.maxTries) {
this.retries++;
console.error(
`Resuming copy at ${this.bytesRead} bytes, retry ${this.retries}/${this.maxTries < 0 ? '∞' : this.maxTries}`,
);
// Resume from the current byte position (bytesRead is the next byte to read)
this.initReadFrom(this.bytesRead);
} else {
console.error(`Dropping out copy after ${this.retries} retries`);
this.destroy(err);
}
}

_read(): void {
if (this.currentReader) {
this.currentReader.resume();
}
}
}

export class Cp {
public execInstance: Exec;
public constructor(config: KubeConfig, execInstance?: Exec) {
Expand All @@ -17,6 +135,9 @@ export class Cp {
* @param {string} srcPath - The source path in the pod
* @param {string} tgtPath - The target path in local
* @param {string} [cwd] - The directory that is used as the parent in the pod when downloading
* @param {number} [maxTries=0] - Set number of retries to complete a copy operation from a container.
* Specify 0 to disable or any negative value for infinite retrying.
* The default is 0 (no retry).
*/
public async cpFromPod(
namespace: string,
Expand All @@ -25,29 +146,63 @@ export class Cp {
srcPath: string,
tgtPath: string,
cwd?: string,
maxTries: number = 0,
): Promise<void> {
const command = ['tar', 'cf', '-'];
if (cwd) {
command.push('-C', cwd);
if (maxTries === 0) {
// Original implementation without retry
const command = ['tar', 'cf', '-'];
if (cwd) {
command.push('-C', cwd);
}
command.push(srcPath);
const writerStream = tar.extract(tgtPath);
const errStream = new WritableStreamBuffer();
await this.execInstance.exec(
namespace,
podName,
containerName,
command,
writerStream,
errStream,
null,
false,
async () => {
if (errStream.size()) {
throw new Error(
`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`,
);
}
},
);
} else {
// Implementation with retry using TarPipe
const tarPipe = new TarPipe(
namespace,
podName,
containerName,
srcPath,
this.execInstance,
maxTries,
cwd,
);
const writerStream = tar.extract(tgtPath);

return new Promise<void>((resolve, reject) => {
tarPipe.on('error', (err: Error) => {
reject(err);
});

writerStream.on('error', (err: Error) => {
reject(err);
});

writerStream.on('finish', () => {
resolve();
});

tarPipe.pipe(writerStream);
});
}
command.push(srcPath);
const writerStream = tar.extract(tgtPath);
const errStream = new WritableStreamBuffer();
this.execInstance.exec(
namespace,
podName,
containerName,
command,
writerStream,
errStream,
null,
false,
async () => {
if (errStream.size()) {
throw new Error(`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`);
}
},
);
}

/**
Expand All @@ -56,31 +211,84 @@ export class Cp {
* @param {string} containerName - The name of the container in the pod to exec the command inside.
* @param {string} srcPath - The source path in local
* @param {string} tgtPath - The target path in the pod
* @param {number} [maxTries=0] - Set number of retries to complete a copy operation to a container.
* Specify 0 to disable or any negative value for infinite retrying.
* The default is 0 (no retry).
*/
public async cpToPod(
namespace: string,
podName: string,
containerName: string,
srcPath: string,
tgtPath: string,
maxTries: number = 0,
): Promise<void> {
const command = ['tar', 'xf', '-', '-C', tgtPath];
const readStream = tar.pack(srcPath);
const errStream = new WritableStreamBuffer();
this.execInstance.exec(
namespace,
podName,
containerName,
command,
null,
errStream,
readStream,
false,
async () => {
if (errStream.size()) {
throw new Error(`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`);
if (maxTries === 0) {
// Original implementation without retry
const command = ['tar', 'xf', '-', '-C', tgtPath];
const readStream = tar.pack(srcPath);
const errStream = new WritableStreamBuffer();
await this.execInstance.exec(
namespace,
podName,
containerName,
command,
null,
errStream,
readStream,
false,
async () => {
if (errStream.size()) {
throw new Error(
`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`,
);
}
},
);
} else {
// Implementation with retry
let retries = 0;
let lastError: Error | null = null;

while (maxTries < 0 || retries <= maxTries) {
try {
const command = ['tar', 'xf', '-', '-C', tgtPath];
const readStream = tar.pack(srcPath);
const errStream = new WritableStreamBuffer();

await this.execInstance.exec(
namespace,
podName,
containerName,
command,
null,
errStream,
readStream,
false,
async () => {
if (errStream.size()) {
throw new Error(
`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`,
);
}
},
);
// Success - exit the retry loop
return;
} catch (err) {
lastError = err as Error;
if (maxTries < 0 || retries < maxTries) {
retries++;
console.error(
`Retrying cpToPod, attempt ${retries}/${maxTries < 0 ? '∞' : maxTries}: ${lastError.message}`,
);
} else {
break;
}
}
},
);
}

throw new Error(`cpToPod failed after ${retries} retries: ${lastError?.message}`);
}
}
}
38 changes: 38 additions & 0 deletions src/cp_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,43 @@ describe('Cp', () => {
await cp.cpToPod(namespace, pod, container, srcPath, tgtPath);
verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called();
});

it('should run extract tar command with retries parameter', async () => {
const kc = new KubeConfig();
const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler);
const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket) as WebSocket.WebSocket;
const callAwaiter: CallAwaiter = new CallAwaiter();
const exec = new Exec(kc, instance(fakeWebSocketInterface));
const cp = new Cp(kc, exec);

const namespace = 'somenamespace';
const pod = 'somepod';
const container = 'container';
const srcPath = 'testdata/archive.txt';
const tgtPath = '/';
const maxTries = 3;
const cmdArray = ['tar', 'xf', '-', '-C', tgtPath];
const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`;

const query = {
stdout: false,
stderr: true,
stdin: true,
tty: false,
command: cmdArray,
container,
};
const queryStr = querystring.stringify(query);

const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket);
when(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).thenResolve(
fakeConn,
);
when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send'));
when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close'));

await cp.cpToPod(namespace, pod, container, srcPath, tgtPath, maxTries);
verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called();
});
});
});
Loading